http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java index d972135..24166de 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java @@ -21,11 +21,10 @@ import backtype.storm.Config; import java.io.UnsupportedEncodingException; import java.util.Map; - public class ZookeeperAuthInfo { public String scheme; public byte[] payload = null; - + public ZookeeperAuthInfo(Map conf) { String scheme = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME); String payload = (String) conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD); @@ -34,9 +33,9 @@ public class ZookeeperAuthInfo { scheme = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME); payload = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD); } - if(scheme!=null) { + if (scheme != null) { this.scheme = scheme; - if(payload != null) { + if (payload != null) { try { this.payload = payload.getBytes("UTF-8"); } catch (UnsupportedEncodingException ex) { @@ -45,7 +44,7 @@ public class ZookeeperAuthInfo { } } } - + public ZookeeperAuthInfo(String scheme, byte[] payload) { this.scheme = scheme; this.payload = payload;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java index 08a763a..f0e8f9d 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java @@ -27,58 +27,58 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZookeeperServerCnxnFactory { - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperServerCnxnFactory.class); - int _port; - NIOServerCnxnFactory _factory; - - public ZookeeperServerCnxnFactory(int port, int maxClientCnxns) { - //port range - int max; - if (port <= 0) { - _port = 2000; - max = 65535; - } else { - _port = port; - max = port; - } + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperServerCnxnFactory.class); + int _port; + NIOServerCnxnFactory _factory; - try { - _factory = new NIOServerCnxnFactory(); - } catch (IOException e) { - _port = 0; - _factory = null; - e.printStackTrace(); - throw new RuntimeException(e.getMessage()); - } - - //look for available port - for (; _port <= max; _port++) { - try { - _factory.configure(new InetSocketAddress(_port), maxClientCnxns); - LOG.debug("Zookeeper server successfully binded at port "+_port); - break; - } catch (BindException e1) { - } catch (IOException e2) { - _port = 0; - _factory = null; - e2.printStackTrace(); - throw new RuntimeException(e2.getMessage()); - } - } + public ZookeeperServerCnxnFactory(int port, int maxClientCnxns) { + // port range + int max; + if (port <= 0) { + _port = 2000; + max = 65535; + } else { + _port = port; + max = port; + } - if (_port > max) { - _port = 0; - _factory = null; - LOG.error("Failed to find a port for Zookeeper"); - throw new RuntimeException("No port is available to launch an inprocess zookeeper."); - } - } - - public int port() { - return _port; - } - - public NIOServerCnxnFactory factory() { - return _factory; - } + try { + _factory = new NIOServerCnxnFactory(); + } catch (IOException e) { + _port = 0; + _factory = null; + e.printStackTrace(); + throw new RuntimeException(e.getMessage()); + } + + // look for available port + for (; _port <= max; _port++) { + try { + _factory.configure(new InetSocketAddress(_port), maxClientCnxns); + LOG.debug("Zookeeper server successfully binded at port " + _port); + break; + } catch (BindException e1) { + } catch (IOException e2) { + _port = 0; + _factory = null; + e2.printStackTrace(); + throw new RuntimeException(e2.getMessage()); + } + } + + if (_port > max) { + _port = 0; + _factory = null; + LOG.error("Failed to find a port for Zookeeper"); + throw new RuntimeException("No port is available to launch an inprocess zookeeper."); + } + } + + public int port() { + return _port; + } + + public NIOServerCnxnFactory factory() { + return _factory; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java index c7199c6..7a1e18a 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java @@ -22,17 +22,17 @@ import com.lmax.disruptor.WaitStrategy; public abstract class AbstractSequencerExt extends AbstractSequencer { private static boolean waitSleep = true; - + public static boolean isWaitSleep() { return waitSleep; } - + public static void setWaitSleep(boolean waitSleep) { AbstractSequencerExt.waitSleep = waitSleep; } - + public AbstractSequencerExt(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java index cb5d7f9..2bcfdec 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java @@ -33,19 +33,19 @@ import com.lmax.disruptor.util.Util; * Suitable for use for sequencing across multiple publisher threads. */ public class MultiProducerSequencer extends AbstractSequencerExt { - + private static final Unsafe UNSAFE = Util.getUnsafe(); private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); - + private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); - + // availableBuffer tracks the state of each ringbuffer slot // see below for more details on the approach private final int[] availableBuffer; private final int indexMask; private final int indexShift; - + /** * Construct a Sequencer with the selected wait strategy and buffer size. * @@ -59,7 +59,7 @@ public class MultiProducerSequencer extends AbstractSequencerExt { indexShift = Util.log2(bufferSize); initialiseAvailableBuffer(); } - + /** * @see Sequencer#hasAvailableCapacity(int) */ @@ -67,23 +67,23 @@ public class MultiProducerSequencer extends AbstractSequencerExt { public boolean hasAvailableCapacity(final int requiredCapacity) { return hasAvailableCapacity(gatingSequences, requiredCapacity, cursor.get()); } - + private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int requiredCapacity, long cursorValue) { long wrapPoint = (cursorValue + requiredCapacity) - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); - + if (wrapPoint > cachedGatingSequence || cachedGatingSequence > cursorValue) { long minSequence = Util.getMinimumSequence(gatingSequences, cursorValue); gatingSequenceCache.set(minSequence); - + if (wrapPoint > minSequence) { return false; } } - + return true; } - + /** * @see Sequencer#claim(long) */ @@ -91,7 +91,7 @@ public class MultiProducerSequencer extends AbstractSequencerExt { public void claim(long sequence) { cursor.set(sequence); } - + /** * @see Sequencer#next() */ @@ -99,7 +99,7 @@ public class MultiProducerSequencer extends AbstractSequencerExt { public long next() { return next(1); } - + /** * @see Sequencer#next(int) */ @@ -108,20 +108,20 @@ public class MultiProducerSequencer extends AbstractSequencerExt { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } - + long current; long next; - + do { current = cursor.get(); next = current + n; - + long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); - + if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); - + if (wrapPoint > gatingSequence) { if (AbstractSequencerExt.isWaitSleep()) { try { @@ -133,16 +133,16 @@ public class MultiProducerSequencer extends AbstractSequencerExt { } continue; } - + gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); - + return next; } - + /** * @see Sequencer#tryNext() */ @@ -150,7 +150,7 @@ public class MultiProducerSequencer extends AbstractSequencerExt { public long tryNext() throws InsufficientCapacityException { return tryNext(1); } - + /** * @see Sequencer#tryNext(int) */ @@ -159,22 +159,22 @@ public class MultiProducerSequencer extends AbstractSequencerExt { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } - + long current; long next; - + do { current = cursor.get(); next = current + n; - + if (!hasAvailableCapacity(gatingSequences, n, current)) { throw InsufficientCapacityException.INSTANCE; } } while (!cursor.compareAndSet(current, next)); - + return next; } - + /** * @see Sequencer#remainingCapacity() */ @@ -184,15 +184,15 @@ public class MultiProducerSequencer extends AbstractSequencerExt { long produced = cursor.get(); return getBufferSize() - (produced - consumed); } - + private void initialiseAvailableBuffer() { for (int i = availableBuffer.length - 1; i != 0; i--) { setAvailableBufferValue(i, -1); } - + setAvailableBufferValue(0, -1); } - + /** * @see Sequencer#publish(long) */ @@ -201,7 +201,7 @@ public class MultiProducerSequencer extends AbstractSequencerExt { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); } - + /** * @see Sequencer#publish(long, long) */ @@ -212,7 +212,7 @@ public class MultiProducerSequencer extends AbstractSequencerExt { } waitStrategy.signalAllWhenBlocking(); } - + /** * The below methods work on the availableBuffer flag. * @@ -229,12 +229,12 @@ public class MultiProducerSequencer extends AbstractSequencerExt { private void setAvailable(final long sequence) { setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); } - + private void setAvailableBufferValue(int index, int flag) { long bufferAddress = (index * SCALE) + BASE; UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); } - + /** * @see Sequencer#isAvailable(long) */ @@ -245,7 +245,7 @@ public class MultiProducerSequencer extends AbstractSequencerExt { long bufferAddress = (index * SCALE) + BASE; return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag; } - + @Override public long getHighestPublishedSequence(long lowerBound, long availableSequence) { for (long sequence = lowerBound; sequence <= availableSequence; sequence++) { @@ -253,14 +253,14 @@ public class MultiProducerSequencer extends AbstractSequencerExt { return sequence - 1; } } - + return availableSequence; } - + private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); } - + private int calculateIndex(final long sequence) { return ((int) sequence) & indexMask; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java index da124f0..763294b 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java @@ -43,12 +43,12 @@ import backtype.storm.utils.disruptor.SingleProducerSequencer; */ public class RingBuffer<E> implements Cursored, DataProvider<E> { public static final long INITIAL_CURSOR_VALUE = -1L; - + private final int indexMask; private final Object[] entries; private final int bufferSize; private final Sequencer sequencer; - + /** * Construct a RingBuffer with the full option set. * @@ -59,19 +59,19 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); - + if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } - + this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize()]; fill(eventFactory); } - + /** * Create a new multiple producer RingBuffer with the specified wait strategy. * @@ -83,10 +83,10 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { */ public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy); - + return new RingBuffer<E>(factory, sequencer); } - + /** * Create a new multiple producer RingBuffer using the default wait strategy {@link BlockingWaitStrategy}. * @@ -98,7 +98,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize) { return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy()); } - + /** * Create a new single producer RingBuffer with the specified wait strategy. * @@ -110,10 +110,10 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { */ public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) { SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); - + return new RingBuffer<E>(factory, sequencer); } - + /** * Create a new single producer RingBuffer using the default wait strategy {@link BlockingWaitStrategy}. * @@ -125,7 +125,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize) { return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy()); } - + /** * Create a new Ring Buffer with the specified producer type (SINGLE or MULTI) * @@ -145,7 +145,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { throw new IllegalStateException(producerType.toString()); } } - + /** * <p> * Get the event for a given sequence in the RingBuffer. @@ -168,7 +168,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public E get(long sequence) { return (E) entries[(int) sequence & indexMask]; } - + /** * @deprecated Use {@link RingBuffer#get(long)} */ @@ -176,7 +176,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public E getPreallocated(long sequence) { return get(sequence); } - + /** * @deprecated Use {@link RingBuffer#get(long)} */ @@ -184,7 +184,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public E getPublished(long sequence) { return get(sequence); } - + /** * Increment and return the next sequence for the ring buffer. Calls of this method should ensure that they always publish the sequence afterward. E.g. * @@ -205,7 +205,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public long next() { return sequencer.next(); } - + /** * The same functionality as {@link RingBuffer#next()}, but allows the caller to claim the next n sequences. * @@ -216,7 +216,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public long next(int n) { return sequencer.next(n); } - + /** * <p> * Increment and return the next sequence for the ring buffer. Calls of this method should ensure that they always publish the sequence afterward. E.g. @@ -242,7 +242,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public long tryNext() throws InsufficientCapacityException { return sequencer.tryNext(); } - + /** * The same functionality as {@link RingBuffer#tryNext()}, but allows the caller to attempt to claim the next n sequences. * @@ -253,7 +253,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public long tryNext(int n) throws InsufficientCapacityException { return sequencer.tryNext(n); } - + /** * Resets the cursor to a specific value. This can be applied at any time, but it is worth not that it is a racy thing to do and should only be used in * controlled circumstances. E.g. during initialisation. @@ -265,7 +265,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.claim(sequence); sequencer.publish(sequence); } - + /** * Sets the cursor to a specific sequence and returns the preallocated entry that is stored there. This is another deliberately racy call, that should only * be done in controlled circumstances, e.g. initialisation. @@ -277,7 +277,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.claim(sequence); return get(sequence); } - + /** * Determines if a particular entry has been published. * @@ -287,7 +287,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public boolean isPublished(long sequence) { return sequencer.isAvailable(sequence); } - + /** * Add the specified gating sequences to this instance of the Disruptor. They will safely and atomically added to the list of gating sequences. * @@ -296,7 +296,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public void addGatingSequences(Sequence... gatingSequences) { sequencer.addGatingSequences(gatingSequences); } - + /** * Get the minimum sequence value from all of the gating sequences added to this ringBuffer. * @@ -305,7 +305,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public long getMinimumGatingSequence() { return sequencer.getMinimumSequence(); } - + /** * Remove the specified sequence from this ringBuffer. * @@ -315,7 +315,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public boolean removeGatingSequence(Sequence sequence) { return sequencer.removeGatingSequence(sequence); } - + /** * Create a new SequenceBarrier to be used by an EventProcessor to track which messages are available to be read from the ring buffer given a list of * sequences to track. @@ -327,7 +327,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { return sequencer.newBarrier(sequencesToTrack); } - + /** * Get the current cursor value for the ring buffer. The cursor value is the last value that was published, or the highest available sequence that can be * consumed. @@ -335,14 +335,14 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public long getCursor() { return sequencer.getCursor(); } - + /** * The size of the buffer. */ public int getBufferSize() { return bufferSize; } - + /** * Given specified <tt>requiredCapacity</tt> determines if that amount of space is available. Note, you can not assume that if this method returns * <tt>true</tt> that a call to {@link RingBuffer#next()} will not block. Especially true if this ring buffer is set up to handle multiple producers. @@ -353,7 +353,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public boolean hasAvailableCapacity(int requiredCapacity) { return sequencer.hasAvailableCapacity(requiredCapacity); } - + /** * Publishes an event to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring buffer and * publishing the claimed sequence after translation. @@ -364,7 +364,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long sequence = sequencer.next(); translateAndPublish(translator, sequence); } - + /** * Attempts to publish an event to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring buffer * and publishing the claimed sequence after translation. Will return false if specified capacity was not available. @@ -381,7 +381,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows one user supplied argument. * @@ -393,7 +393,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0); } - + /** * Allows one user supplied argument. * @@ -411,7 +411,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows two user supplied arguments. * @@ -424,7 +424,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0, arg1); } - + /** * Allows two user supplied arguments. * @@ -443,7 +443,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows three user supplied arguments * @@ -457,7 +457,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, arg0, arg1, arg2); } - + /** * Allows three user supplied arguments * @@ -477,7 +477,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows a variable number of user supplied arguments * @@ -489,7 +489,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long sequence = sequencer.next(); translateAndPublish(translator, sequence, args); } - + /** * Allows a variable number of user supplied arguments * @@ -507,7 +507,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Publishes multiple events to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring buffer and * publishing the claimed sequence after translation. @@ -517,7 +517,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public void publishEvents(EventTranslator<E>[] translators) { publishEvents(translators, 0, translators.length); } - + /** * Publishes multiple events to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring buffer and * publishing the claimed sequence after translation. @@ -531,7 +531,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence); } - + /** * Attempts to publish multiple events to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring * buffer and publishing the claimed sequence after translation. Will return false if specified capacity was not available. @@ -542,7 +542,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public boolean tryPublishEvents(EventTranslator<E>[] translators) { return tryPublishEvents(translators, 0, translators.length); } - + /** * Attempts to publish multiple events to the ring buffer. It handles claiming the next sequence, getting the current (uninitialised) event from the ring * buffer and publishing the claimed sequence after translation. Will return false if specified capacity was not available. @@ -562,18 +562,18 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows one user supplied argument per event. * * @param translator The user specified translation for the event * @param arg0 A user supplied argument. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #publishEvents(EventTranslator[]) */ public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0) { publishEvents(translator, 0, arg0.length, arg0); } - + /** * Allows one user supplied argument per event. * @@ -588,19 +588,19 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence); } - + /** * Allows one user supplied argument. * * @param translator The user specified translation for each event * @param arg0 An array of user supplied arguments, one element per event. * @return true if the value was published, false if there was insufficient capacity. - * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #tryPublishEvents(EventTranslator[]) */ public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0) { return tryPublishEvents(translator, 0, arg0.length, arg0); } - + /** * Allows one user supplied argument. * @@ -621,19 +621,19 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows two user supplied arguments per event. * * @param translator The user specified translation for the event * @param arg0 An array of user supplied arguments, one element per event. * @param arg1 An array of user supplied arguments, one element per event. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #publishEvents(EventTranslator[]) */ public <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1) { publishEvents(translator, 0, arg0.length, arg0, arg1); } - + /** * Allows two user supplied arguments per event. * @@ -649,7 +649,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence); } - + /** * Allows two user supplied arguments per event. * @@ -657,12 +657,12 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { * @param arg0 An array of user supplied arguments, one element per event. * @param arg1 An array of user supplied arguments, one element per event. * @return true if the value was published, false if there was insufficient capacity. - * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #tryPublishEvents(EventTranslator[]) */ public <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1) { return tryPublishEvents(translator, 0, arg0.length, arg0, arg1); } - + /** * Allows two user supplied arguments per event. * @@ -684,7 +684,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows three user supplied arguments per event. * @@ -692,12 +692,12 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { * @param arg0 An array of user supplied arguments, one element per event. * @param arg1 An array of user supplied arguments, one element per event. * @param arg2 An array of user supplied arguments, one element per event. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #publishEvents(EventTranslator[]) */ public <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2) { publishEvents(translator, 0, arg0.length, arg0, arg1, arg2); } - + /** * Allows three user supplied arguments per event. * @@ -714,7 +714,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence); } - + /** * Allows three user supplied arguments per event. * @@ -723,12 +723,12 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { * @param arg1 An array of user supplied arguments, one element per event. * @param arg2 An array of user supplied arguments, one element per event. * @return true if the value was published, false if there was insufficient capacity. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #publishEvents(EventTranslator[]) */ public <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2) { return tryPublishEvents(translator, 0, arg0.length, arg0, arg1, arg2); } - + /** * Allows three user supplied arguments per event. * @@ -751,18 +751,18 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Allows a variable number of user supplied arguments per event. * * @param translator The user specified translation for the event * @param args User supplied arguments, one Object[] per event. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #publishEvents(EventTranslator[]) */ public void publishEvents(EventTranslatorVararg<E> translator, Object[]... args) { publishEvents(translator, 0, args.length, args); } - + /** * Allows a variable number of user supplied arguments per event. * @@ -777,19 +777,19 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { final long finalSequence = sequencer.next(batchSize); translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args); } - + /** * Allows a variable number of user supplied arguments per event. * * @param translator The user specified translation for the event * @param args User supplied arguments, one Object[] per event. * @return true if the value was published, false if there was insufficient capacity. - * @see #publishEvents(com.lmax.disruptor.EventTranslator[]) + * @see #publishEvents(EventTranslator[]) */ public boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args) { return tryPublishEvents(translator, 0, args.length, args); } - + /** * Allows a variable number of user supplied arguments per event. * @@ -810,7 +810,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { return false; } } - + /** * Publish the specified sequence. This action marks this particular message as being available to be read. * @@ -819,7 +819,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public void publish(long sequence) { sequencer.publish(sequence); } - + /** * Publish the specified sequences. This action marks these particular messages as being available to be read. * @@ -830,7 +830,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public void publish(long lo, long hi) { sequencer.publish(lo, hi); } - + /** * Get the remaining capacity for this ringBuffer. * @@ -839,49 +839,51 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { public long remainingCapacity() { return sequencer.remainingCapacity(); } - + private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(translators, batchStartsAt, batchSize); } - + private void checkBatchSizing(int batchStartsAt, int batchSize) { if (batchStartsAt < 0 || batchSize < 0) { - throw new IllegalArgumentException("Both batchStartsAt and batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and batchSize " + batchSize); + throw new IllegalArgumentException("Both batchStartsAt and batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and batchSize " + + batchSize); } else if (batchSize > bufferSize) { throw new IllegalArgumentException("The ring buffer cannot accommodate " + batchSize + " it only has space for " + bufferSize + " entities."); } } - + private <A> void checkBounds(final A[] arg0, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(arg0, batchStartsAt, batchSize); } - + private <A, B> void checkBounds(final A[] arg0, final B[] arg1, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(arg0, batchStartsAt, batchSize); batchOverRuns(arg1, batchStartsAt, batchSize); } - + private <A, B, C> void checkBounds(final A[] arg0, final B[] arg1, final C[] arg2, final int batchStartsAt, final int batchSize) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(arg0, batchStartsAt, batchSize); batchOverRuns(arg1, batchStartsAt, batchSize); batchOverRuns(arg2, batchStartsAt, batchSize); } - + private void checkBounds(final int batchStartsAt, final int batchSize, final Object[][] args) { checkBatchSizing(batchStartsAt, batchSize); batchOverRuns(args, batchStartsAt, batchSize); } - + private <A> void batchOverRuns(final A[] arg0, final int batchStartsAt, final int batchSize) { if (batchStartsAt + batchSize > arg0.length) { - throw new IllegalArgumentException("A batchSize of: " + batchSize + " with batchStatsAt of: " + batchStartsAt + " will overrun the available number of arguments: " + (arg0.length - batchStartsAt)); + throw new IllegalArgumentException("A batchSize of: " + batchSize + " with batchStatsAt of: " + batchStartsAt + + " will overrun the available number of arguments: " + (arg0.length - batchStartsAt)); } } - + private void translateAndPublish(EventTranslator<E> translator, long sequence) { try { translator.translateTo(get(sequence), sequence); @@ -889,7 +891,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(sequence); } } - + private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) { try { translator.translateTo(get(sequence), sequence, arg0); @@ -897,7 +899,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(sequence); } } - + private <A, B> void translateAndPublish(EventTranslatorTwoArg<E, A, B> translator, long sequence, A arg0, B arg1) { try { translator.translateTo(get(sequence), sequence, arg0, arg1); @@ -905,7 +907,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(sequence); } } - + private <A, B, C> void translateAndPublish(EventTranslatorThreeArg<E, A, B, C> translator, long sequence, A arg0, B arg1, C arg2) { try { translator.translateTo(get(sequence), sequence, arg0, arg1, arg2); @@ -913,7 +915,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(sequence); } } - + private void translateAndPublish(EventTranslatorVararg<E> translator, long sequence, Object... args) { try { translator.translateTo(get(sequence), sequence, args); @@ -921,7 +923,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(sequence); } } - + private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt, final int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { @@ -935,8 +937,9 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(initialSequence, finalSequence); } } - - private <A> void translateAndPublishBatch(final EventTranslatorOneArg<E, A> translator, final A[] arg0, int batchStartsAt, final int batchSize, final long finalSequence) { + + private <A> void translateAndPublishBatch(final EventTranslatorOneArg<E, A> translator, final A[] arg0, int batchStartsAt, final int batchSize, + final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; @@ -948,8 +951,9 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(initialSequence, finalSequence); } } - - private <A, B> void translateAndPublishBatch(final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0, final B[] arg1, int batchStartsAt, int batchSize, final long finalSequence) { + + private <A, B> void translateAndPublishBatch(final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0, final B[] arg1, int batchStartsAt, + int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; @@ -961,8 +965,9 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(initialSequence, finalSequence); } } - - private <A, B, C> void translateAndPublishBatch(final EventTranslatorThreeArg<E, A, B, C> translator, final A[] arg0, final B[] arg1, final C[] arg2, int batchStartsAt, final int batchSize, final long finalSequence) { + + private <A, B, C> void translateAndPublishBatch(final EventTranslatorThreeArg<E, A, B, C> translator, final A[] arg0, final B[] arg1, final C[] arg2, + int batchStartsAt, final int batchSize, final long finalSequence) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; @@ -974,8 +979,9 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(initialSequence, finalSequence); } } - - private void translateAndPublishBatch(final EventTranslatorVararg<E> translator, int batchStartsAt, final int batchSize, final long finalSequence, final Object[][] args) { + + private void translateAndPublishBatch(final EventTranslatorVararg<E> translator, int batchStartsAt, final int batchSize, final long finalSequence, + final Object[][] args) { final long initialSequence = finalSequence - (batchSize - 1); try { long sequence = initialSequence; @@ -987,7 +993,7 @@ public class RingBuffer<E> implements Cursored, DataProvider<E> { sequencer.publish(initialSequence, finalSequence); } } - + private void fill(EventFactory<E> eventFactory) { for (int i = 0; i < entries.length; i++) { entries[i] = eventFactory.newInstance(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java index 5ca2724..ad0843b 100755 --- a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java +++ b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java @@ -40,9 +40,9 @@ public class SingleProducerSequencer extends AbstractSequencerExt { /** Set to -1 as sequence starting point */ public long nextValue = -1L, cachedValue = -1L, p2, p3, p4, p5, p6, p7; } - + private final Padding pad = new Padding(); - + /** * Construct a Sequencer with the selected wait strategy and buffer size. * @@ -52,29 +52,29 @@ public class SingleProducerSequencer extends AbstractSequencerExt { public SingleProducerSequencer(int bufferSize, final WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } - + /** * @see Sequencer#hasAvailableCapacity(int) */ @Override public boolean hasAvailableCapacity(final int requiredCapacity) { long nextValue = pad.nextValue; - + long wrapPoint = (nextValue + requiredCapacity) - bufferSize; long cachedGatingSequence = pad.cachedValue; - + if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence = Util.getMinimumSequence(gatingSequences, nextValue); pad.cachedValue = minSequence; - + if (wrapPoint > minSequence) { return false; } } - + return true; } - + /** * @see Sequencer#next() */ @@ -82,7 +82,7 @@ public class SingleProducerSequencer extends AbstractSequencerExt { public long next() { return next(1); } - + /** * @see Sequencer#next(int) */ @@ -91,13 +91,13 @@ public class SingleProducerSequencer extends AbstractSequencerExt { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } - + long nextValue = pad.nextValue; - + long nextSequence = nextValue + n; long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = pad.cachedValue; - + if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) { long minSequence; while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) { @@ -110,15 +110,15 @@ public class SingleProducerSequencer extends AbstractSequencerExt { LockSupport.parkNanos(1); } } - + pad.cachedValue = minSequence; } - + pad.nextValue = nextSequence; - + return nextSequence; } - + /** * @see Sequencer#tryNext() */ @@ -126,7 +126,7 @@ public class SingleProducerSequencer extends AbstractSequencerExt { public long tryNext() throws InsufficientCapacityException { return tryNext(1); } - + /** * @see Sequencer#tryNext(int) */ @@ -135,28 +135,28 @@ public class SingleProducerSequencer extends AbstractSequencerExt { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } - + if (!hasAvailableCapacity(n)) { throw InsufficientCapacityException.INSTANCE; } - + long nextSequence = pad.nextValue += n; - + return nextSequence; } - + /** * @see Sequencer#remainingCapacity() */ @Override public long remainingCapacity() { long nextValue = pad.nextValue; - + long consumed = Util.getMinimumSequence(gatingSequences, nextValue); long produced = nextValue; return getBufferSize() - (produced - consumed); } - + /** * @see Sequencer#claim(long) */ @@ -164,7 +164,7 @@ public class SingleProducerSequencer extends AbstractSequencerExt { public void claim(long sequence) { pad.nextValue = sequence; } - + /** * @see Sequencer#publish(long) */ @@ -173,7 +173,7 @@ public class SingleProducerSequencer extends AbstractSequencerExt { cursor.set(sequence); waitStrategy.signalAllWhenBlocking(); } - + /** * @see Sequencer#publish(long, long) */ @@ -181,7 +181,7 @@ public class SingleProducerSequencer extends AbstractSequencerExt { public void publish(long lo, long hi) { publish(hi); } - + /** * @see Sequencer#isAvailable(long) */ @@ -189,7 +189,7 @@ public class SingleProducerSequencer extends AbstractSequencerExt { public boolean isAvailable(long sequence) { return sequence <= cursor.get(); } - + @Override public long getHighestPublishedSequence(long lowerBound, long availableSequence) { return availableSequence; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java index 807c5ec..4d0b713 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java @@ -59,8 +59,7 @@ public class BatchId implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } private static AtomicLong staticId = new AtomicLong(0); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java index 85dec6c..cff7b34 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java @@ -30,8 +30,7 @@ import com.alibaba.jstorm.batch.impl.CoordinatedBolt; import com.alibaba.jstorm.batch.util.BatchDef; public class BatchTopologyBuilder { - private static final Logger LOG = LoggerFactory - .getLogger(BatchTopologyBuilder.class); + private static final Logger LOG = LoggerFactory.getLogger(BatchTopologyBuilder.class); private TopologyBuilder topologyBuilder; @@ -40,17 +39,13 @@ public class BatchTopologyBuilder { public BatchTopologyBuilder(String topologyName) { topologyBuilder = new TopologyBuilder(); - spoutDeclarer = - topologyBuilder.setSpout(BatchDef.SPOUT_TRIGGER, - new BatchSpoutTrigger(), 1); + spoutDeclarer = topologyBuilder.setSpout(BatchDef.SPOUT_TRIGGER, new BatchSpoutTrigger(), 1); } public BoltDeclarer setSpout(String id, IBatchSpout spout, int paralel) { - BoltDeclarer boltDeclarer = - this.setBolt(id, (IBatchSpout) spout, paralel); - boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, - BatchDef.COMPUTING_STREAM_ID); + BoltDeclarer boltDeclarer = this.setBolt(id, (IBatchSpout) spout, paralel); + boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, BatchDef.COMPUTING_STREAM_ID); return boltDeclarer; } @@ -58,24 +53,19 @@ public class BatchTopologyBuilder { public BoltDeclarer setBolt(String id, IBasicBolt bolt, int paralel) { CoordinatedBolt coordinatedBolt = new CoordinatedBolt(bolt); - BoltDeclarer boltDeclarer = - topologyBuilder.setBolt(id, coordinatedBolt, paralel); + BoltDeclarer boltDeclarer = topologyBuilder.setBolt(id, coordinatedBolt, paralel); if (bolt instanceof IPrepareCommit) { - boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, - BatchDef.PREPARE_STREAM_ID); + boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, BatchDef.PREPARE_STREAM_ID); } if (bolt instanceof ICommitter) { - boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, - BatchDef.COMMIT_STREAM_ID); - boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, - BatchDef.REVERT_STREAM_ID); + boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, BatchDef.COMMIT_STREAM_ID); + boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, BatchDef.REVERT_STREAM_ID); } if (bolt instanceof IPostCommit) { - boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, - BatchDef.POST_STREAM_ID); + boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, BatchDef.POST_STREAM_ID); } return boltDeclarer; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java index 591f0f0..ef917b2 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java @@ -28,8 +28,7 @@ public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable { * * execute only receive trigger message * - * do emitBatch operation in execute whose streamID is - * "batch/compute-stream" + * do emitBatch operation in execute whose streamID is "batch/compute-stream" */ // void execute(Tuple input, IBasicOutputCollector collector); /** @@ -44,8 +43,7 @@ public interface IBatchSpout extends IBasicBolt, ICommitter, Serializable { /** * begin to revert batchId's data * - * If current task fails to commit batchId, it won't call revert(batchId) If - * current task fails to revert batchId, JStorm won't call revert again. + * If current task fails to commit batchId, it won't call revert(batchId) If current task fails to revert batchId, JStorm won't call revert again. * * if not transaction, it can don't care revert * http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java index 16f10da..83845ae 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java @@ -29,8 +29,7 @@ import backtype.storm.topology.FailedException; */ public interface ICommitter extends Serializable { /** - * begin to commit batchId's data, then return the commit result The - * commitResult will store into outside storage + * begin to commit batchId's data, then return the commit result The commitResult will store into outside storage * * if failed to commit, please throw FailedException * @@ -43,8 +42,7 @@ public interface ICommitter extends Serializable { /** * begin to revert batchId's data * - * If current task fails to commit batchId, it won't call revert(batchId) If - * current task fails to revert batchId, JStorm won't call revert again. + * If current task fails to commit batchId, it won't call revert(batchId) If current task fails to revert batchId, JStorm won't call revert again. * * @param id */ http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java index aa75f9e..e03d58f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java @@ -33,6 +33,5 @@ public interface IPrepareCommit { * @param id * @param collector */ - void prepareCommit(BatchId id, BasicOutputCollector collector) - throws FailedException; + void prepareCommit(BatchId id, BasicOutputCollector collector) throws FailedException; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java index 99b1915..d76a8d7 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java @@ -59,7 +59,6 @@ public class BatchSpoutMsgId implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java index c1cdae4..edb882b 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java @@ -53,8 +53,7 @@ public class BatchSpoutTrigger implements IRichSpout { /** */ private static final long serialVersionUID = 7215109169247425954L; - private static final Logger LOG = LoggerFactory - .getLogger(BatchSpoutTrigger.class); + private static final Logger LOG = LoggerFactory.getLogger(BatchSpoutTrigger.class); private LinkedBlockingQueue<BatchSpoutMsgId> batchQueue; @@ -95,9 +94,7 @@ public class BatchSpoutTrigger implements IRichSpout { BatchId.updateId(zkMsgId); } - int max_spout_pending = - JStormUtils.parseInt( - conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1); + int max_spout_pending = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1); for (int i = 0; i < max_spout_pending; i++) { BatchSpoutMsgId msgId = BatchSpoutMsgId.mkInstance(); @@ -111,8 +108,7 @@ public class BatchSpoutTrigger implements IRichSpout { } @Override - public void open(Map conf, TopologyContext context, - SpoutOutputCollector collector) { + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { batchQueue = new LinkedBlockingQueue<BatchSpoutMsgId>(); this.collector = collector; this.conf = conf; @@ -134,7 +130,7 @@ public class BatchSpoutTrigger implements IRichSpout { @Override public void close() { - zkClient.close(); + zkClient.close(); } @Override @@ -204,19 +200,16 @@ public class BatchSpoutTrigger implements IRichSpout { batchQueue.offer(msgId); if (intervalCheck.check()) { - LOG.info("Current msgId " + msgId - + ", but current commit BatchId is " + currentBatchId); + LOG.info("Current msgId " + msgId + ", but current commit BatchId is " + currentBatchId); } else { - LOG.debug("Current msgId " + msgId - + ", but current commit BatchId is " + currentBatchId); + LOG.debug("Current msgId " + msgId + ", but current commit BatchId is " + currentBatchId); } return; } String streamId = getStreamId(msgId.getBatchStatus()); - List<Integer> outTasks = - collector.emit(streamId, new Values(msgId.getBatchId()), msgId); + List<Integer> outTasks = collector.emit(streamId, new Values(msgId.getBatchId()), msgId); if (outTasks.isEmpty()) { forward(msgId); } @@ -278,8 +271,7 @@ public class BatchSpoutTrigger implements IRichSpout { forward((BatchSpoutMsgId) msgId); return; } else { - LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" - + msgId); + LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" + msgId); return; } } @@ -306,18 +298,15 @@ public class BatchSpoutTrigger implements IRichSpout { if (msgId instanceof BatchSpoutMsgId) { handleFail((BatchSpoutMsgId) msgId); } else { - LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" - + msgId); + LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" + msgId); return; } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new Fields( - "BatchId")); - declarer.declareStream(BatchDef.PREPARE_STREAM_ID, - new Fields("BatchId")); + declarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new Fields("BatchId")); + declarer.declareStream(BatchDef.PREPARE_STREAM_ID, new Fields("BatchId")); declarer.declareStream(BatchDef.COMMIT_STREAM_ID, new Fields("BatchId")); declarer.declareStream(BatchDef.REVERT_STREAM_ID, new Fields("BatchId")); declarer.declareStream(BatchDef.POST_STREAM_ID, new Fields("BatchId")); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java index c9bf0b5..63ca6a0 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java @@ -75,23 +75,20 @@ public class CoordinatedBolt implements IRichBolt { try { zkClient = BatchCommon.getZkClient(conf); - zkCommitPath = - BatchDef.ZK_COMMIT_DIR + BatchDef.ZK_SEPERATOR + taskId; + zkCommitPath = BatchDef.ZK_COMMIT_DIR + BatchDef.ZK_SEPERATOR + taskId; if (zkClient.node_existed(zkCommitPath, false)) { zkClient.delete_node(zkCommitPath); } zkClient.mkdirs(zkCommitPath); - LOG.info(taskName + " successfully create commit path" - + zkCommitPath); + LOG.info(taskName + " successfully create commit path" + zkCommitPath); } catch (Exception e) { LOG.error("Failed to create zk node", e); throw new RuntimeException(); } } - public void prepare(Map conf, TopologyContext context, - OutputCollector collector) { + public void prepare(Map conf, TopologyContext context, OutputCollector collector) { taskId = String.valueOf(context.getThisTaskId()); taskName = context.getThisComponentId() + "_" + context.getThisTaskId(); @@ -101,9 +98,7 @@ public class CoordinatedBolt implements IRichBolt { if (delegate instanceof ICommitter) { isCommiter = true; - commited = - new TimeCacheMap<Object, Object>( - context.maxTopologyMessageTimeout()); + commited = new TimeCacheMap<Object, Object>(context.maxTopologyMessageTimeout()); mkCommitDir(conf); } @@ -130,8 +125,7 @@ public class CoordinatedBolt implements IRichBolt { }); for (int index = 0; index < childs.size() - reserveSize; index++) { - zkClient.delete_node(path + BatchDef.ZK_SEPERATOR - + childs.get(index)); + zkClient.delete_node(path + BatchDef.ZK_SEPERATOR + childs.get(index)); } } @@ -263,8 +257,7 @@ public class CoordinatedBolt implements IRichBolt { } else if (batchStatus == BatchStatus.POST_COMMIT) { handlePostCommit(tuple); } else { - throw new RuntimeException( - "Receive commit tuple, but not committer"); + throw new RuntimeException("Receive commit tuple, but not committer"); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java index fcc54fa..bee87ef 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java @@ -31,8 +31,7 @@ import com.alibaba.jstorm.cluster.DistributedClusterState; import com.alibaba.jstorm.utils.JStormUtils; public class BatchCommon { - private static final Logger LOG = LoggerFactory - .getLogger(BatchCommon.class); + private static final Logger LOG = LoggerFactory.getLogger(BatchCommon.class); private static ClusterState zkClient = null; @@ -44,26 +43,18 @@ public class BatchCommon { List<String> zkServers = null; if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS) != null) { - zkServers = - (List<String>) conf - .get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS); + zkServers = (List<String>) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS); } else if (conf.get(Config.STORM_ZOOKEEPER_SERVERS) != null) { - zkServers = - (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS); } else { throw new RuntimeException("No setting zk"); } int port = 2181; if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT) != null) { - port = - JStormUtils.parseInt( - conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT), - 2181); + port = JStormUtils.parseInt(conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT), 2181); } else if (conf.get(Config.STORM_ZOOKEEPER_PORT) != null) { - port = - JStormUtils.parseInt( - conf.get(Config.STORM_ZOOKEEPER_PORT), 2181); + port = JStormUtils.parseInt(conf.get(Config.STORM_ZOOKEEPER_PORT), 2181); } String root = BatchDef.BATCH_ZK_ROOT; @@ -71,9 +62,7 @@ public class BatchCommon { root = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT); } - root = - root + BatchDef.ZK_SEPERATOR - + conf.get(Config.TOPOLOGY_NAME); + root = root + BatchDef.ZK_SEPERATOR + conf.get(Config.TOPOLOGY_NAME); Map<Object, Object> tmpConf = new HashMap<Object, Object>(); tmpConf.putAll(conf); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java index a5a6835..37653eb 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java @@ -23,27 +23,26 @@ import java.util.Map; import com.alibaba.jstorm.client.ConfigExtension; - - public interface JStormCache extends Serializable { public static final String TAG_TIMEOUT_LIST = ConfigExtension.CACHE_TIMEOUT_LIST; - - void init(Map<Object, Object> conf)throws Exception; + + void init(Map<Object, Object> conf) throws Exception; + void cleanup(); - - Object get(String key) ; - + + Object get(String key); + void getBatch(Map<String, Object> map); void remove(String key); - + void removeBatch(Collection<String> keys); void put(String key, Object value, int timeoutSecond); void put(String key, Object value); - - void putBatch(Map<String, Object> map) ; - + + void putBatch(Map<String, Object> map); + void putBatch(Map<String, Object> map, int timeoutSeconds); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java index e72e3d0..845ed1e 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java @@ -17,101 +17,85 @@ */ package com.alibaba.jstorm.cache; -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.commons.lang.StringUtils; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.WriteBatch; -import org.rocksdb.WriteOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import backtype.storm.utils.Utils; - import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; +import org.apache.commons.lang.StringUtils; +import org.rocksdb.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; +import java.util.Map.Entry; public class RocksDBCache implements JStormCache { private static final long serialVersionUID = 705938812240167583L; private static Logger LOG = LoggerFactory.getLogger(RocksDBCache.class); - + static { RocksDB.loadLibrary(); } - + public static final String ROCKSDB_ROOT_DIR = "rocksdb.root.dir"; public static final String ROCKSDB_RESET = "rocksdb.reset"; protected RocksDB db; protected String rootDir; - + public void initDir(Map<Object, Object> conf) { String confDir = (String) conf.get(ROCKSDB_ROOT_DIR); if (StringUtils.isBlank(confDir) == true) { throw new RuntimeException("Doesn't set rootDir of rocksDB"); } - - boolean clean = ConfigExtension.getNimbusCacheReset(conf); + + boolean clean = (Boolean) conf.get(ROCKSDB_RESET); LOG.info("RocksDB reset is " + clean); if (clean == true) { try { PathUtils.rmr(confDir); } catch (IOException e) { - // TODO Auto-generated catch block throw new RuntimeException("Failed to cleanup rooDir of rocksDB " + confDir); } } - + File file = new File(confDir); if (file.exists() == false) { try { PathUtils.local_mkdirs(confDir); file = new File(confDir); } catch (IOException e) { - // TODO Auto-generated catch block throw new RuntimeException("Failed to mkdir rooDir of rocksDB " + confDir); } } - + rootDir = file.getAbsolutePath(); } - + public void initDb(List<Integer> list) throws Exception { LOG.info("Begin to init rocksDB of {}", rootDir); - + Options dbOptions = null; - try { dbOptions = new Options().setCreateMissingColumnFamilies(true).setCreateIfMissing(true); - + List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<ColumnFamilyHandle>(); - + db = RocksDB.open(dbOptions, rootDir); - + LOG.info("Successfully init rocksDB of {}", rootDir); } finally { - if (dbOptions != null) { dbOptions.dispose(); } } } - + @Override public void init(Map<Object, Object> conf) throws Exception { - // TODO Auto-generated method stub initDir(conf); - + List<Integer> list = new ArrayList<Integer>(); if (conf.get(TAG_TIMEOUT_LIST) != null) { for (Object obj : (List) ConfigExtension.getCacheTimeoutList(conf)) { @@ -119,11 +103,11 @@ public class RocksDBCache implements JStormCache { if (timeoutSecond == null || timeoutSecond <= 0) { continue; } - + list.add(timeoutSecond); } } - + // Add retry logic boolean isSuccess = false; for (int i = 0; i < 3; i++) { @@ -135,32 +119,29 @@ public class RocksDBCache implements JStormCache { LOG.warn("Failed to init rocksDB " + rootDir, e); try { PathUtils.rmr(rootDir); - } catch (IOException e1) { - // TODO Auto-generated catch block - + } catch (IOException ignored) { } } } - + if (isSuccess == false) { throw new RuntimeException("Failed to init rocksDB " + rootDir); } } - + @Override public void cleanup() { LOG.info("Begin to close rocketDb of {}", rootDir); - + if (db != null) { db.close(); } - + LOG.info("Successfully closed rocketDb of {}", rootDir); } - + @Override public Object get(String key) { - // TODO Auto-generated method stub try { byte[] data = db.get(key.getBytes()); if (data != null) { @@ -172,36 +153,35 @@ public class RocksDBCache implements JStormCache { return null; } } - + } catch (Exception e) { - } - + return null; } - + @Override public void getBatch(Map<String, Object> map) { List<byte[]> lookupKeys = new ArrayList<byte[]>(); for (String key : map.keySet()) { lookupKeys.add(key.getBytes()); } - + try { Map<byte[], byte[]> results = db.multiGet(lookupKeys); if (results == null || results.size() == 0) { return; } - + for (Entry<byte[], byte[]> resultEntry : results.entrySet()) { byte[] keyByte = resultEntry.getKey(); byte[] valueByte = resultEntry.getValue(); - + if (keyByte == null || valueByte == null) { continue; } - - Object value = null; + + Object value; try { value = Utils.javaDeserialize(valueByte); } catch (Exception e) { @@ -209,45 +189,37 @@ public class RocksDBCache implements JStormCache { db.remove(keyByte); continue; } - + map.put(new String(keyByte), value); } - - return; } catch (Exception e) { LOG.error("Failed to query " + map.keySet() + ", in window: "); } - - return; } - + @Override public void remove(String key) { try { db.remove(key.getBytes()); - + } catch (Exception e) { LOG.error("Failed to remove " + key); } - + } - + @Override public void removeBatch(Collection<String> keys) { - // TODO Auto-generated method stub for (String key : keys) { remove(key); } } - + @Override public void put(String key, Object value, int timeoutSecond) { - // TODO Auto-generated method stub - put(key, value); - } - + @Override public void put(String key, Object value) { byte[] data = Utils.javaSerialize(value); @@ -255,38 +227,36 @@ public class RocksDBCache implements JStormCache { db.put(key.getBytes(), data); } catch (Exception e) { LOG.error("Failed put into cache, " + key, e); - return; } } - + @Override public void putBatch(Map<String, Object> map) { - // TODO Auto-generated method stub WriteOptions writeOpts = null; WriteBatch writeBatch = null; - + Set<byte[]> putKeys = new HashSet<byte[]>(); - + try { writeOpts = new WriteOptions(); writeBatch = new WriteBatch(); - + for (Entry<String, Object> entry : map.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); - + byte[] data = Utils.javaSerialize(value); - + if (StringUtils.isBlank(key) || data == null || data.length == 0) { continue; } - + byte[] keyByte = key.getBytes(); writeBatch.put(keyByte, data); - + putKeys.add(keyByte); } - + db.write(writeOpts, writeBatch); } catch (Exception e) { LOG.error("Failed to putBatch into DB, " + map.keySet(), e); @@ -294,18 +264,16 @@ public class RocksDBCache implements JStormCache { if (writeOpts != null) { writeOpts.dispose(); } - + if (writeBatch != null) { writeBatch.dispose(); } } - + } - + @Override public void putBatch(Map<String, Object> map, int timeoutSeconds) { - // TODO Auto-generated method stub putBatch(map); } - }
