http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java 
b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java
index d972135..24166de 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperAuthInfo.java
@@ -21,11 +21,10 @@ import backtype.storm.Config;
 import java.io.UnsupportedEncodingException;
 import java.util.Map;
 
-
 public class ZookeeperAuthInfo {
     public String scheme;
     public byte[] payload = null;
-    
+
     public ZookeeperAuthInfo(Map conf) {
         String scheme = (String) 
conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_SCHEME);
         String payload = (String) 
conf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
@@ -34,9 +33,9 @@ public class ZookeeperAuthInfo {
             scheme = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_SCHEME);
             payload = (String) conf.get(Config.STORM_ZOOKEEPER_AUTH_PAYLOAD);
         }
-        if(scheme!=null) {
+        if (scheme != null) {
             this.scheme = scheme;
-            if(payload != null) {
+            if (payload != null) {
                 try {
                     this.payload = payload.getBytes("UTF-8");
                 } catch (UnsupportedEncodingException ex) {
@@ -45,7 +44,7 @@ public class ZookeeperAuthInfo {
             }
         }
     }
-    
+
     public ZookeeperAuthInfo(String scheme, byte[] payload) {
         this.scheme = scheme;
         this.payload = payload;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java
 
b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java
index 08a763a..f0e8f9d 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/utils/ZookeeperServerCnxnFactory.java
@@ -27,58 +27,58 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ZookeeperServerCnxnFactory {
-       private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperServerCnxnFactory.class);
-       int _port;
-       NIOServerCnxnFactory _factory;
-       
-       public ZookeeperServerCnxnFactory(int port, int maxClientCnxns)  {
-               //port range
-               int max;
-               if (port <= 0) {
-                       _port = 2000;
-                       max = 65535;
-               } else {
-                       _port = port;
-                       max = port;
-               }
+    private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperServerCnxnFactory.class);
+    int _port;
+    NIOServerCnxnFactory _factory;
 
-               try {
-                       _factory = new NIOServerCnxnFactory();
-               } catch (IOException e) {
-                       _port = 0;
-                       _factory = null;
-                       e.printStackTrace();
-                       throw new RuntimeException(e.getMessage());
-               }
-               
-               //look for available port 
-               for (; _port <= max; _port++) {
-                       try {
-                               _factory.configure(new 
InetSocketAddress(_port), maxClientCnxns);
-                               LOG.debug("Zookeeper server successfully binded 
at port "+_port);
-                               break;
-                       } catch (BindException e1) {
-                       } catch (IOException e2) {
-                               _port = 0;
-                               _factory = null;
-                               e2.printStackTrace();
-                               throw new RuntimeException(e2.getMessage());
-                       } 
-               }               
+    public ZookeeperServerCnxnFactory(int port, int maxClientCnxns) {
+        // port range
+        int max;
+        if (port <= 0) {
+            _port = 2000;
+            max = 65535;
+        } else {
+            _port = port;
+            max = port;
+        }
 
-               if (_port > max) {
-                       _port = 0;
-                       _factory = null;
-                       LOG.error("Failed to find a port for Zookeeper");
-                       throw new RuntimeException("No port is available to 
launch an inprocess zookeeper.");
-               }
-       }
-       
-       public int port() {
-               return _port;
-       }
-               
-       public NIOServerCnxnFactory factory() {
-               return _factory;
-       }
+        try {
+            _factory = new NIOServerCnxnFactory();
+        } catch (IOException e) {
+            _port = 0;
+            _factory = null;
+            e.printStackTrace();
+            throw new RuntimeException(e.getMessage());
+        }
+
+        // look for available port
+        for (; _port <= max; _port++) {
+            try {
+                _factory.configure(new InetSocketAddress(_port), 
maxClientCnxns);
+                LOG.debug("Zookeeper server successfully binded at port " + 
_port);
+                break;
+            } catch (BindException e1) {
+            } catch (IOException e2) {
+                _port = 0;
+                _factory = null;
+                e2.printStackTrace();
+                throw new RuntimeException(e2.getMessage());
+            }
+        }
+
+        if (_port > max) {
+            _port = 0;
+            _factory = null;
+            LOG.error("Failed to find a port for Zookeeper");
+            throw new RuntimeException("No port is available to launch an 
inprocess zookeeper.");
+        }
+    }
+
+    public int port() {
+        return _port;
+    }
+
+    public NIOServerCnxnFactory factory() {
+        return _factory;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java
 
b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java
index c7199c6..7a1e18a 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/AbstractSequencerExt.java
@@ -22,17 +22,17 @@ import com.lmax.disruptor.WaitStrategy;
 
 public abstract class AbstractSequencerExt extends AbstractSequencer {
     private static boolean waitSleep = true;
-    
+
     public static boolean isWaitSleep() {
         return waitSleep;
     }
-    
+
     public static void setWaitSleep(boolean waitSleep) {
         AbstractSequencerExt.waitSleep = waitSleep;
     }
-    
+
     public AbstractSequencerExt(int bufferSize, WaitStrategy waitStrategy) {
         super(bufferSize, waitStrategy);
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java
 
b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java
index cb5d7f9..2bcfdec 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/MultiProducerSequencer.java
@@ -33,19 +33,19 @@ import com.lmax.disruptor.util.Util;
  * Suitable for use for sequencing across multiple publisher threads.
  */
 public class MultiProducerSequencer extends AbstractSequencerExt {
-    
+
     private static final Unsafe UNSAFE = Util.getUnsafe();
     private static final long BASE = UNSAFE.arrayBaseOffset(int[].class);
     private static final long SCALE = UNSAFE.arrayIndexScale(int[].class);
-    
+
     private final Sequence gatingSequenceCache = new 
Sequence(Sequencer.INITIAL_CURSOR_VALUE);
-    
+
     // availableBuffer tracks the state of each ringbuffer slot
     // see below for more details on the approach
     private final int[] availableBuffer;
     private final int indexMask;
     private final int indexShift;
-    
+
     /**
      * Construct a Sequencer with the selected wait strategy and buffer size.
      * 
@@ -59,7 +59,7 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
         indexShift = Util.log2(bufferSize);
         initialiseAvailableBuffer();
     }
-    
+
     /**
      * @see Sequencer#hasAvailableCapacity(int)
      */
@@ -67,23 +67,23 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
     public boolean hasAvailableCapacity(final int requiredCapacity) {
         return hasAvailableCapacity(gatingSequences, requiredCapacity, 
cursor.get());
     }
-    
+
     private boolean hasAvailableCapacity(Sequence[] gatingSequences, final int 
requiredCapacity, long cursorValue) {
         long wrapPoint = (cursorValue + requiredCapacity) - bufferSize;
         long cachedGatingSequence = gatingSequenceCache.get();
-        
+
         if (wrapPoint > cachedGatingSequence || cachedGatingSequence > 
cursorValue) {
             long minSequence = Util.getMinimumSequence(gatingSequences, 
cursorValue);
             gatingSequenceCache.set(minSequence);
-            
+
             if (wrapPoint > minSequence) {
                 return false;
             }
         }
-        
+
         return true;
     }
-    
+
     /**
      * @see Sequencer#claim(long)
      */
@@ -91,7 +91,7 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
     public void claim(long sequence) {
         cursor.set(sequence);
     }
-    
+
     /**
      * @see Sequencer#next()
      */
@@ -99,7 +99,7 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
     public long next() {
         return next(1);
     }
-    
+
     /**
      * @see Sequencer#next(int)
      */
@@ -108,20 +108,20 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
         if (n < 1) {
             throw new IllegalArgumentException("n must be > 0");
         }
-        
+
         long current;
         long next;
-        
+
         do {
             current = cursor.get();
             next = current + n;
-            
+
             long wrapPoint = next - bufferSize;
             long cachedGatingSequence = gatingSequenceCache.get();
-            
+
             if (wrapPoint > cachedGatingSequence || cachedGatingSequence > 
current) {
                 long gatingSequence = Util.getMinimumSequence(gatingSequences, 
current);
-                
+
                 if (wrapPoint > gatingSequence) {
                     if (AbstractSequencerExt.isWaitSleep()) {
                         try {
@@ -133,16 +133,16 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
                     }
                     continue;
                 }
-                
+
                 gatingSequenceCache.set(gatingSequence);
             } else if (cursor.compareAndSet(current, next)) {
                 break;
             }
         } while (true);
-        
+
         return next;
     }
-    
+
     /**
      * @see Sequencer#tryNext()
      */
@@ -150,7 +150,7 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
     public long tryNext() throws InsufficientCapacityException {
         return tryNext(1);
     }
-    
+
     /**
      * @see Sequencer#tryNext(int)
      */
@@ -159,22 +159,22 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
         if (n < 1) {
             throw new IllegalArgumentException("n must be > 0");
         }
-        
+
         long current;
         long next;
-        
+
         do {
             current = cursor.get();
             next = current + n;
-            
+
             if (!hasAvailableCapacity(gatingSequences, n, current)) {
                 throw InsufficientCapacityException.INSTANCE;
             }
         } while (!cursor.compareAndSet(current, next));
-        
+
         return next;
     }
-    
+
     /**
      * @see Sequencer#remainingCapacity()
      */
@@ -184,15 +184,15 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
         long produced = cursor.get();
         return getBufferSize() - (produced - consumed);
     }
-    
+
     private void initialiseAvailableBuffer() {
         for (int i = availableBuffer.length - 1; i != 0; i--) {
             setAvailableBufferValue(i, -1);
         }
-        
+
         setAvailableBufferValue(0, -1);
     }
-    
+
     /**
      * @see Sequencer#publish(long)
      */
@@ -201,7 +201,7 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
         setAvailable(sequence);
         waitStrategy.signalAllWhenBlocking();
     }
-    
+
     /**
      * @see Sequencer#publish(long, long)
      */
@@ -212,7 +212,7 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
         }
         waitStrategy.signalAllWhenBlocking();
     }
-    
+
     /**
      * The below methods work on the availableBuffer flag.
      * 
@@ -229,12 +229,12 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
     private void setAvailable(final long sequence) {
         setAvailableBufferValue(calculateIndex(sequence), 
calculateAvailabilityFlag(sequence));
     }
-    
+
     private void setAvailableBufferValue(int index, int flag) {
         long bufferAddress = (index * SCALE) + BASE;
         UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
     }
-    
+
     /**
      * @see Sequencer#isAvailable(long)
      */
@@ -245,7 +245,7 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
         long bufferAddress = (index * SCALE) + BASE;
         return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
     }
-    
+
     @Override
     public long getHighestPublishedSequence(long lowerBound, long 
availableSequence) {
         for (long sequence = lowerBound; sequence <= availableSequence; 
sequence++) {
@@ -253,14 +253,14 @@ public class MultiProducerSequencer extends 
AbstractSequencerExt {
                 return sequence - 1;
             }
         }
-        
+
         return availableSequence;
     }
-    
+
     private int calculateAvailabilityFlag(final long sequence) {
         return (int) (sequence >>> indexShift);
     }
-    
+
     private int calculateIndex(final long sequence) {
         return ((int) sequence) & indexMask;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java 
b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java
index da124f0..763294b 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/RingBuffer.java
@@ -43,12 +43,12 @@ import 
backtype.storm.utils.disruptor.SingleProducerSequencer;
  */
 public class RingBuffer<E> implements Cursored, DataProvider<E> {
     public static final long INITIAL_CURSOR_VALUE = -1L;
-    
+
     private final int indexMask;
     private final Object[] entries;
     private final int bufferSize;
     private final Sequencer sequencer;
-    
+
     /**
      * Construct a RingBuffer with the full option set.
      * 
@@ -59,19 +59,19 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer) {
         this.sequencer = sequencer;
         this.bufferSize = sequencer.getBufferSize();
-        
+
         if (bufferSize < 1) {
             throw new IllegalArgumentException("bufferSize must not be less 
than 1");
         }
         if (Integer.bitCount(bufferSize) != 1) {
             throw new IllegalArgumentException("bufferSize must be a power of 
2");
         }
-        
+
         this.indexMask = bufferSize - 1;
         this.entries = new Object[sequencer.getBufferSize()];
         fill(eventFactory);
     }
-    
+
     /**
      * Create a new multiple producer RingBuffer with the specified wait 
strategy.
      * 
@@ -83,10 +83,10 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
      */
     public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> 
factory, int bufferSize, WaitStrategy waitStrategy) {
         MultiProducerSequencer sequencer = new 
MultiProducerSequencer(bufferSize, waitStrategy);
-        
+
         return new RingBuffer<E>(factory, sequencer);
     }
-    
+
     /**
      * Create a new multiple producer RingBuffer using the default wait 
strategy {@link BlockingWaitStrategy}.
      * 
@@ -98,7 +98,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> 
factory, int bufferSize) {
         return createMultiProducer(factory, bufferSize, new 
BlockingWaitStrategy());
     }
-    
+
     /**
      * Create a new single producer RingBuffer with the specified wait 
strategy.
      * 
@@ -110,10 +110,10 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
      */
     public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> 
factory, int bufferSize, WaitStrategy waitStrategy) {
         SingleProducerSequencer sequencer = new 
SingleProducerSequencer(bufferSize, waitStrategy);
-        
+
         return new RingBuffer<E>(factory, sequencer);
     }
-    
+
     /**
      * Create a new single producer RingBuffer using the default wait strategy 
{@link BlockingWaitStrategy}.
      * 
@@ -125,7 +125,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> 
factory, int bufferSize) {
         return createSingleProducer(factory, bufferSize, new 
BlockingWaitStrategy());
     }
-    
+
     /**
      * Create a new Ring Buffer with the specified producer type (SINGLE or 
MULTI)
      * 
@@ -145,7 +145,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             throw new IllegalStateException(producerType.toString());
         }
     }
-    
+
     /**
      * <p>
      * Get the event for a given sequence in the RingBuffer.
@@ -168,7 +168,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public E get(long sequence) {
         return (E) entries[(int) sequence & indexMask];
     }
-    
+
     /**
      * @deprecated Use {@link RingBuffer#get(long)}
      */
@@ -176,7 +176,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public E getPreallocated(long sequence) {
         return get(sequence);
     }
-    
+
     /**
      * @deprecated Use {@link RingBuffer#get(long)}
      */
@@ -184,7 +184,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public E getPublished(long sequence) {
         return get(sequence);
     }
-    
+
     /**
      * Increment and return the next sequence for the ring buffer. Calls of 
this method should ensure that they always publish the sequence afterward. E.g.
      * 
@@ -205,7 +205,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public long next() {
         return sequencer.next();
     }
-    
+
     /**
      * The same functionality as {@link RingBuffer#next()}, but allows the 
caller to claim the next n sequences.
      * 
@@ -216,7 +216,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public long next(int n) {
         return sequencer.next(n);
     }
-    
+
     /**
      * <p>
      * Increment and return the next sequence for the ring buffer. Calls of 
this method should ensure that they always publish the sequence afterward. E.g.
@@ -242,7 +242,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public long tryNext() throws InsufficientCapacityException {
         return sequencer.tryNext();
     }
-    
+
     /**
      * The same functionality as {@link RingBuffer#tryNext()}, but allows the 
caller to attempt to claim the next n sequences.
      * 
@@ -253,7 +253,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public long tryNext(int n) throws InsufficientCapacityException {
         return sequencer.tryNext(n);
     }
-    
+
     /**
      * Resets the cursor to a specific value. This can be applied at any time, 
but it is worth not that it is a racy thing to do and should only be used in
      * controlled circumstances. E.g. during initialisation.
@@ -265,7 +265,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         sequencer.claim(sequence);
         sequencer.publish(sequence);
     }
-    
+
     /**
      * Sets the cursor to a specific sequence and returns the preallocated 
entry that is stored there. This is another deliberately racy call, that should 
only
      * be done in controlled circumstances, e.g. initialisation.
@@ -277,7 +277,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         sequencer.claim(sequence);
         return get(sequence);
     }
-    
+
     /**
      * Determines if a particular entry has been published.
      * 
@@ -287,7 +287,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public boolean isPublished(long sequence) {
         return sequencer.isAvailable(sequence);
     }
-    
+
     /**
      * Add the specified gating sequences to this instance of the Disruptor. 
They will safely and atomically added to the list of gating sequences.
      * 
@@ -296,7 +296,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public void addGatingSequences(Sequence... gatingSequences) {
         sequencer.addGatingSequences(gatingSequences);
     }
-    
+
     /**
      * Get the minimum sequence value from all of the gating sequences added 
to this ringBuffer.
      * 
@@ -305,7 +305,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public long getMinimumGatingSequence() {
         return sequencer.getMinimumSequence();
     }
-    
+
     /**
      * Remove the specified sequence from this ringBuffer.
      * 
@@ -315,7 +315,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public boolean removeGatingSequence(Sequence sequence) {
         return sequencer.removeGatingSequence(sequence);
     }
-    
+
     /**
      * Create a new SequenceBarrier to be used by an EventProcessor to track 
which messages are available to be read from the ring buffer given a list of
      * sequences to track.
@@ -327,7 +327,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
         return sequencer.newBarrier(sequencesToTrack);
     }
-    
+
     /**
      * Get the current cursor value for the ring buffer. The cursor value is 
the last value that was published, or the highest available sequence that can be
      * consumed.
@@ -335,14 +335,14 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public long getCursor() {
         return sequencer.getCursor();
     }
-    
+
     /**
      * The size of the buffer.
      */
     public int getBufferSize() {
         return bufferSize;
     }
-    
+
     /**
      * Given specified <tt>requiredCapacity</tt> determines if that amount of 
space is available. Note, you can not assume that if this method returns
      * <tt>true</tt> that a call to {@link RingBuffer#next()} will not block. 
Especially true if this ring buffer is set up to handle multiple producers.
@@ -353,7 +353,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public boolean hasAvailableCapacity(int requiredCapacity) {
         return sequencer.hasAvailableCapacity(requiredCapacity);
     }
-    
+
     /**
      * Publishes an event to the ring buffer. It handles claiming the next 
sequence, getting the current (uninitialised) event from the ring buffer and
      * publishing the claimed sequence after translation.
@@ -364,7 +364,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long sequence = sequencer.next();
         translateAndPublish(translator, sequence);
     }
-    
+
     /**
      * Attempts to publish an event to the ring buffer. It handles claiming 
the next sequence, getting the current (uninitialised) event from the ring 
buffer
      * and publishing the claimed sequence after translation. Will return 
false if specified capacity was not available.
@@ -381,7 +381,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows one user supplied argument.
      * 
@@ -393,7 +393,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long sequence = sequencer.next();
         translateAndPublish(translator, sequence, arg0);
     }
-    
+
     /**
      * Allows one user supplied argument.
      * 
@@ -411,7 +411,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows two user supplied arguments.
      * 
@@ -424,7 +424,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long sequence = sequencer.next();
         translateAndPublish(translator, sequence, arg0, arg1);
     }
-    
+
     /**
      * Allows two user supplied arguments.
      * 
@@ -443,7 +443,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows three user supplied arguments
      * 
@@ -457,7 +457,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long sequence = sequencer.next();
         translateAndPublish(translator, sequence, arg0, arg1, arg2);
     }
-    
+
     /**
      * Allows three user supplied arguments
      * 
@@ -477,7 +477,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows a variable number of user supplied arguments
      * 
@@ -489,7 +489,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long sequence = sequencer.next();
         translateAndPublish(translator, sequence, args);
     }
-    
+
     /**
      * Allows a variable number of user supplied arguments
      * 
@@ -507,7 +507,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Publishes multiple events to the ring buffer. It handles claiming the 
next sequence, getting the current (uninitialised) event from the ring buffer 
and
      * publishing the claimed sequence after translation.
@@ -517,7 +517,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public void publishEvents(EventTranslator<E>[] translators) {
         publishEvents(translators, 0, translators.length);
     }
-    
+
     /**
      * Publishes multiple events to the ring buffer. It handles claiming the 
next sequence, getting the current (uninitialised) event from the ring buffer 
and
      * publishing the claimed sequence after translation.
@@ -531,7 +531,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long finalSequence = sequencer.next(batchSize);
         translateAndPublishBatch(translators, batchStartsAt, batchSize, 
finalSequence);
     }
-    
+
     /**
      * Attempts to publish multiple events to the ring buffer. It handles 
claiming the next sequence, getting the current (uninitialised) event from the 
ring
      * buffer and publishing the claimed sequence after translation. Will 
return false if specified capacity was not available.
@@ -542,7 +542,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public boolean tryPublishEvents(EventTranslator<E>[] translators) {
         return tryPublishEvents(translators, 0, translators.length);
     }
-    
+
     /**
      * Attempts to publish multiple events to the ring buffer. It handles 
claiming the next sequence, getting the current (uninitialised) event from the 
ring
      * buffer and publishing the claimed sequence after translation. Will 
return false if specified capacity was not available.
@@ -562,18 +562,18 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows one user supplied argument per event.
      * 
      * @param translator The user specified translation for the event
      * @param arg0 A user supplied argument.
-     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #publishEvents(EventTranslator[])
      */
     public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] 
arg0) {
         publishEvents(translator, 0, arg0.length, arg0);
     }
-    
+
     /**
      * Allows one user supplied argument per event.
      * 
@@ -588,19 +588,19 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long finalSequence = sequencer.next(batchSize);
         translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, 
finalSequence);
     }
-    
+
     /**
      * Allows one user supplied argument.
      * 
      * @param translator The user specified translation for each event
      * @param arg0 An array of user supplied arguments, one element per event.
      * @return true if the value was published, false if there was 
insufficient capacity.
-     * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #tryPublishEvents(EventTranslator[])
      */
     public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> 
translator, A[] arg0) {
         return tryPublishEvents(translator, 0, arg0.length, arg0);
     }
-    
+
     /**
      * Allows one user supplied argument.
      * 
@@ -621,19 +621,19 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows two user supplied arguments per event.
      * 
      * @param translator The user specified translation for the event
      * @param arg0 An array of user supplied arguments, one element per event.
      * @param arg1 An array of user supplied arguments, one element per event.
-     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #publishEvents(EventTranslator[])
      */
     public <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> 
translator, A[] arg0, B[] arg1) {
         publishEvents(translator, 0, arg0.length, arg0, arg1);
     }
-    
+
     /**
      * Allows two user supplied arguments per event.
      * 
@@ -649,7 +649,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long finalSequence = sequencer.next(batchSize);
         translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, 
batchSize, finalSequence);
     }
-    
+
     /**
      * Allows two user supplied arguments per event.
      * 
@@ -657,12 +657,12 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
      * @param arg0 An array of user supplied arguments, one element per event.
      * @param arg1 An array of user supplied arguments, one element per event.
      * @return true if the value was published, false if there was 
insufficient capacity.
-     * @see #tryPublishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #tryPublishEvents(EventTranslator[])
      */
     public <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> 
translator, A[] arg0, B[] arg1) {
         return tryPublishEvents(translator, 0, arg0.length, arg0, arg1);
     }
-    
+
     /**
      * Allows two user supplied arguments per event.
      * 
@@ -684,7 +684,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows three user supplied arguments per event.
      * 
@@ -692,12 +692,12 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
      * @param arg0 An array of user supplied arguments, one element per event.
      * @param arg1 An array of user supplied arguments, one element per event.
      * @param arg2 An array of user supplied arguments, one element per event.
-     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #publishEvents(EventTranslator[])
      */
     public <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> 
translator, A[] arg0, B[] arg1, C[] arg2) {
         publishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
     }
-    
+
     /**
      * Allows three user supplied arguments per event.
      * 
@@ -714,7 +714,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long finalSequence = sequencer.next(batchSize);
         translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, 
batchSize, finalSequence);
     }
-    
+
     /**
      * Allows three user supplied arguments per event.
      * 
@@ -723,12 +723,12 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
      * @param arg1 An array of user supplied arguments, one element per event.
      * @param arg2 An array of user supplied arguments, one element per event.
      * @return true if the value was published, false if there was 
insufficient capacity.
-     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #publishEvents(EventTranslator[])
      */
     public <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, 
C> translator, A[] arg0, B[] arg1, C[] arg2) {
         return tryPublishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
     }
-    
+
     /**
      * Allows three user supplied arguments per event.
      * 
@@ -751,18 +751,18 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Allows a variable number of user supplied arguments per event.
      * 
      * @param translator The user specified translation for the event
      * @param args User supplied arguments, one Object[] per event.
-     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #publishEvents(EventTranslator[])
      */
     public void publishEvents(EventTranslatorVararg<E> translator, Object[]... 
args) {
         publishEvents(translator, 0, args.length, args);
     }
-    
+
     /**
      * Allows a variable number of user supplied arguments per event.
      * 
@@ -777,19 +777,19 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
         final long finalSequence = sequencer.next(batchSize);
         translateAndPublishBatch(translator, batchStartsAt, batchSize, 
finalSequence, args);
     }
-    
+
     /**
      * Allows a variable number of user supplied arguments per event.
      * 
      * @param translator The user specified translation for the event
      * @param args User supplied arguments, one Object[] per event.
      * @return true if the value was published, false if there was 
insufficient capacity.
-     * @see #publishEvents(com.lmax.disruptor.EventTranslator[])
+     * @see #publishEvents(EventTranslator[])
      */
     public boolean tryPublishEvents(EventTranslatorVararg<E> translator, 
Object[]... args) {
         return tryPublishEvents(translator, 0, args.length, args);
     }
-    
+
     /**
      * Allows a variable number of user supplied arguments per event.
      * 
@@ -810,7 +810,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             return false;
         }
     }
-    
+
     /**
      * Publish the specified sequence. This action marks this particular 
message as being available to be read.
      * 
@@ -819,7 +819,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public void publish(long sequence) {
         sequencer.publish(sequence);
     }
-    
+
     /**
      * Publish the specified sequences. This action marks these particular 
messages as being available to be read.
      * 
@@ -830,7 +830,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public void publish(long lo, long hi) {
         sequencer.publish(lo, hi);
     }
-    
+
     /**
      * Get the remaining capacity for this ringBuffer.
      * 
@@ -839,49 +839,51 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
     public long remainingCapacity() {
         return sequencer.remainingCapacity();
     }
-    
+
     private void checkBounds(final EventTranslator<E>[] translators, final int 
batchStartsAt, final int batchSize) {
         checkBatchSizing(batchStartsAt, batchSize);
         batchOverRuns(translators, batchStartsAt, batchSize);
     }
-    
+
     private void checkBatchSizing(int batchStartsAt, int batchSize) {
         if (batchStartsAt < 0 || batchSize < 0) {
-            throw new IllegalArgumentException("Both batchStartsAt and 
batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and 
batchSize " + batchSize);
+            throw new IllegalArgumentException("Both batchStartsAt and 
batchSize must be positive but got: batchStartsAt " + batchStartsAt + " and 
batchSize "
+                    + batchSize);
         } else if (batchSize > bufferSize) {
             throw new IllegalArgumentException("The ring buffer cannot 
accommodate " + batchSize + " it only has space for " + bufferSize + " 
entities.");
         }
     }
-    
+
     private <A> void checkBounds(final A[] arg0, final int batchStartsAt, 
final int batchSize) {
         checkBatchSizing(batchStartsAt, batchSize);
         batchOverRuns(arg0, batchStartsAt, batchSize);
     }
-    
+
     private <A, B> void checkBounds(final A[] arg0, final B[] arg1, final int 
batchStartsAt, final int batchSize) {
         checkBatchSizing(batchStartsAt, batchSize);
         batchOverRuns(arg0, batchStartsAt, batchSize);
         batchOverRuns(arg1, batchStartsAt, batchSize);
     }
-    
+
     private <A, B, C> void checkBounds(final A[] arg0, final B[] arg1, final 
C[] arg2, final int batchStartsAt, final int batchSize) {
         checkBatchSizing(batchStartsAt, batchSize);
         batchOverRuns(arg0, batchStartsAt, batchSize);
         batchOverRuns(arg1, batchStartsAt, batchSize);
         batchOverRuns(arg2, batchStartsAt, batchSize);
     }
-    
+
     private void checkBounds(final int batchStartsAt, final int batchSize, 
final Object[][] args) {
         checkBatchSizing(batchStartsAt, batchSize);
         batchOverRuns(args, batchStartsAt, batchSize);
     }
-    
+
     private <A> void batchOverRuns(final A[] arg0, final int batchStartsAt, 
final int batchSize) {
         if (batchStartsAt + batchSize > arg0.length) {
-            throw new IllegalArgumentException("A batchSize of: " + batchSize 
+ " with batchStatsAt of: " + batchStartsAt + " will overrun the available 
number of arguments: " + (arg0.length - batchStartsAt));
+            throw new IllegalArgumentException("A batchSize of: " + batchSize 
+ " with batchStatsAt of: " + batchStartsAt
+                    + " will overrun the available number of arguments: " + 
(arg0.length - batchStartsAt));
         }
     }
-    
+
     private void translateAndPublish(EventTranslator<E> translator, long 
sequence) {
         try {
             translator.translateTo(get(sequence), sequence);
@@ -889,7 +891,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(sequence);
         }
     }
-    
+
     private <A> void translateAndPublish(EventTranslatorOneArg<E, A> 
translator, long sequence, A arg0) {
         try {
             translator.translateTo(get(sequence), sequence, arg0);
@@ -897,7 +899,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(sequence);
         }
     }
-    
+
     private <A, B> void translateAndPublish(EventTranslatorTwoArg<E, A, B> 
translator, long sequence, A arg0, B arg1) {
         try {
             translator.translateTo(get(sequence), sequence, arg0, arg1);
@@ -905,7 +907,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(sequence);
         }
     }
-    
+
     private <A, B, C> void translateAndPublish(EventTranslatorThreeArg<E, A, 
B, C> translator, long sequence, A arg0, B arg1, C arg2) {
         try {
             translator.translateTo(get(sequence), sequence, arg0, arg1, arg2);
@@ -913,7 +915,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(sequence);
         }
     }
-    
+
     private void translateAndPublish(EventTranslatorVararg<E> translator, long 
sequence, Object... args) {
         try {
             translator.translateTo(get(sequence), sequence, args);
@@ -921,7 +923,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(sequence);
         }
     }
-    
+
     private void translateAndPublishBatch(final EventTranslator<E>[] 
translators, int batchStartsAt, final int batchSize, final long finalSequence) {
         final long initialSequence = finalSequence - (batchSize - 1);
         try {
@@ -935,8 +937,9 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(initialSequence, finalSequence);
         }
     }
-    
-    private <A> void translateAndPublishBatch(final EventTranslatorOneArg<E, 
A> translator, final A[] arg0, int batchStartsAt, final int batchSize, final 
long finalSequence) {
+
+    private <A> void translateAndPublishBatch(final EventTranslatorOneArg<E, 
A> translator, final A[] arg0, int batchStartsAt, final int batchSize,
+            final long finalSequence) {
         final long initialSequence = finalSequence - (batchSize - 1);
         try {
             long sequence = initialSequence;
@@ -948,8 +951,9 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(initialSequence, finalSequence);
         }
     }
-    
-    private <A, B> void translateAndPublishBatch(final 
EventTranslatorTwoArg<E, A, B> translator, final A[] arg0, final B[] arg1, int 
batchStartsAt, int batchSize, final long finalSequence) {
+
+    private <A, B> void translateAndPublishBatch(final 
EventTranslatorTwoArg<E, A, B> translator, final A[] arg0, final B[] arg1, int 
batchStartsAt,
+            int batchSize, final long finalSequence) {
         final long initialSequence = finalSequence - (batchSize - 1);
         try {
             long sequence = initialSequence;
@@ -961,8 +965,9 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(initialSequence, finalSequence);
         }
     }
-    
-    private <A, B, C> void translateAndPublishBatch(final 
EventTranslatorThreeArg<E, A, B, C> translator, final A[] arg0, final B[] arg1, 
final C[] arg2, int batchStartsAt, final int batchSize, final long 
finalSequence) {
+
+    private <A, B, C> void translateAndPublishBatch(final 
EventTranslatorThreeArg<E, A, B, C> translator, final A[] arg0, final B[] arg1, 
final C[] arg2,
+            int batchStartsAt, final int batchSize, final long finalSequence) {
         final long initialSequence = finalSequence - (batchSize - 1);
         try {
             long sequence = initialSequence;
@@ -974,8 +979,9 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(initialSequence, finalSequence);
         }
     }
-    
-    private void translateAndPublishBatch(final EventTranslatorVararg<E> 
translator, int batchStartsAt, final int batchSize, final long finalSequence, 
final Object[][] args) {
+
+    private void translateAndPublishBatch(final EventTranslatorVararg<E> 
translator, int batchStartsAt, final int batchSize, final long finalSequence,
+            final Object[][] args) {
         final long initialSequence = finalSequence - (batchSize - 1);
         try {
             long sequence = initialSequence;
@@ -987,7 +993,7 @@ public class RingBuffer<E> implements Cursored, 
DataProvider<E> {
             sequencer.publish(initialSequence, finalSequence);
         }
     }
-    
+
     private void fill(EventFactory<E> eventFactory) {
         for (int i = 0; i < entries.length; i++) {
             entries[i] = eventFactory.newInstance();

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java
 
b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java
index 5ca2724..ad0843b 100755
--- 
a/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java
+++ 
b/jstorm-core/src/main/java/backtype/storm/utils/disruptor/SingleProducerSequencer.java
@@ -40,9 +40,9 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
         /** Set to -1 as sequence starting point */
         public long nextValue = -1L, cachedValue = -1L, p2, p3, p4, p5, p6, p7;
     }
-    
+
     private final Padding pad = new Padding();
-    
+
     /**
      * Construct a Sequencer with the selected wait strategy and buffer size.
      * 
@@ -52,29 +52,29 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
     public SingleProducerSequencer(int bufferSize, final WaitStrategy 
waitStrategy) {
         super(bufferSize, waitStrategy);
     }
-    
+
     /**
      * @see Sequencer#hasAvailableCapacity(int)
      */
     @Override
     public boolean hasAvailableCapacity(final int requiredCapacity) {
         long nextValue = pad.nextValue;
-        
+
         long wrapPoint = (nextValue + requiredCapacity) - bufferSize;
         long cachedGatingSequence = pad.cachedValue;
-        
+
         if (wrapPoint > cachedGatingSequence || cachedGatingSequence > 
nextValue) {
             long minSequence = Util.getMinimumSequence(gatingSequences, 
nextValue);
             pad.cachedValue = minSequence;
-            
+
             if (wrapPoint > minSequence) {
                 return false;
             }
         }
-        
+
         return true;
     }
-    
+
     /**
      * @see Sequencer#next()
      */
@@ -82,7 +82,7 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
     public long next() {
         return next(1);
     }
-    
+
     /**
      * @see Sequencer#next(int)
      */
@@ -91,13 +91,13 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
         if (n < 1) {
             throw new IllegalArgumentException("n must be > 0");
         }
-        
+
         long nextValue = pad.nextValue;
-        
+
         long nextSequence = nextValue + n;
         long wrapPoint = nextSequence - bufferSize;
         long cachedGatingSequence = pad.cachedValue;
-        
+
         if (wrapPoint > cachedGatingSequence || cachedGatingSequence > 
nextValue) {
             long minSequence;
             while (wrapPoint > (minSequence = 
Util.getMinimumSequence(gatingSequences, nextValue))) {
@@ -110,15 +110,15 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
                     LockSupport.parkNanos(1);
                 }
             }
-            
+
             pad.cachedValue = minSequence;
         }
-        
+
         pad.nextValue = nextSequence;
-        
+
         return nextSequence;
     }
-    
+
     /**
      * @see Sequencer#tryNext()
      */
@@ -126,7 +126,7 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
     public long tryNext() throws InsufficientCapacityException {
         return tryNext(1);
     }
-    
+
     /**
      * @see Sequencer#tryNext(int)
      */
@@ -135,28 +135,28 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
         if (n < 1) {
             throw new IllegalArgumentException("n must be > 0");
         }
-        
+
         if (!hasAvailableCapacity(n)) {
             throw InsufficientCapacityException.INSTANCE;
         }
-        
+
         long nextSequence = pad.nextValue += n;
-        
+
         return nextSequence;
     }
-    
+
     /**
      * @see Sequencer#remainingCapacity()
      */
     @Override
     public long remainingCapacity() {
         long nextValue = pad.nextValue;
-        
+
         long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
         long produced = nextValue;
         return getBufferSize() - (produced - consumed);
     }
-    
+
     /**
      * @see Sequencer#claim(long)
      */
@@ -164,7 +164,7 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
     public void claim(long sequence) {
         pad.nextValue = sequence;
     }
-    
+
     /**
      * @see Sequencer#publish(long)
      */
@@ -173,7 +173,7 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
         cursor.set(sequence);
         waitStrategy.signalAllWhenBlocking();
     }
-    
+
     /**
      * @see Sequencer#publish(long, long)
      */
@@ -181,7 +181,7 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
     public void publish(long lo, long hi) {
         publish(hi);
     }
-    
+
     /**
      * @see Sequencer#isAvailable(long)
      */
@@ -189,7 +189,7 @@ public class SingleProducerSequencer extends 
AbstractSequencerExt {
     public boolean isAvailable(long sequence) {
         return sequence <= cursor.get();
     }
-    
+
     @Override
     public long getHighestPublishedSequence(long lowerBound, long 
availableSequence) {
         return availableSequence;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java
index 807c5ec..4d0b713 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchId.java
@@ -59,8 +59,7 @@ public class BatchId implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.SHORT_PREFIX_STYLE);
     }
 
     private static AtomicLong staticId = new AtomicLong(0);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
index 85dec6c..cff7b34 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/BatchTopologyBuilder.java
@@ -30,8 +30,7 @@ import com.alibaba.jstorm.batch.impl.CoordinatedBolt;
 import com.alibaba.jstorm.batch.util.BatchDef;
 
 public class BatchTopologyBuilder {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(BatchTopologyBuilder.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchTopologyBuilder.class);
 
     private TopologyBuilder topologyBuilder;
 
@@ -40,17 +39,13 @@ public class BatchTopologyBuilder {
     public BatchTopologyBuilder(String topologyName) {
         topologyBuilder = new TopologyBuilder();
 
-        spoutDeclarer =
-                topologyBuilder.setSpout(BatchDef.SPOUT_TRIGGER,
-                        new BatchSpoutTrigger(), 1);
+        spoutDeclarer = topologyBuilder.setSpout(BatchDef.SPOUT_TRIGGER, new 
BatchSpoutTrigger(), 1);
     }
 
     public BoltDeclarer setSpout(String id, IBatchSpout spout, int paralel) {
 
-        BoltDeclarer boltDeclarer =
-                this.setBolt(id, (IBatchSpout) spout, paralel);
-        boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-                BatchDef.COMPUTING_STREAM_ID);
+        BoltDeclarer boltDeclarer = this.setBolt(id, (IBatchSpout) spout, 
paralel);
+        boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, 
BatchDef.COMPUTING_STREAM_ID);
 
         return boltDeclarer;
     }
@@ -58,24 +53,19 @@ public class BatchTopologyBuilder {
     public BoltDeclarer setBolt(String id, IBasicBolt bolt, int paralel) {
         CoordinatedBolt coordinatedBolt = new CoordinatedBolt(bolt);
 
-        BoltDeclarer boltDeclarer =
-                topologyBuilder.setBolt(id, coordinatedBolt, paralel);
+        BoltDeclarer boltDeclarer = topologyBuilder.setBolt(id, 
coordinatedBolt, paralel);
 
         if (bolt instanceof IPrepareCommit) {
-            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-                    BatchDef.PREPARE_STREAM_ID);
+            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, 
BatchDef.PREPARE_STREAM_ID);
         }
 
         if (bolt instanceof ICommitter) {
-            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-                    BatchDef.COMMIT_STREAM_ID);
-            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-                    BatchDef.REVERT_STREAM_ID);
+            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, 
BatchDef.COMMIT_STREAM_ID);
+            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, 
BatchDef.REVERT_STREAM_ID);
         }
 
         if (bolt instanceof IPostCommit) {
-            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER,
-                    BatchDef.POST_STREAM_ID);
+            boltDeclarer.allGrouping(BatchDef.SPOUT_TRIGGER, 
BatchDef.POST_STREAM_ID);
         }
 
         return boltDeclarer;

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
index 591f0f0..ef917b2 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IBatchSpout.java
@@ -28,8 +28,7 @@ public interface IBatchSpout extends IBasicBolt, ICommitter, 
Serializable {
      * 
      * execute only receive trigger message
      * 
-     * do emitBatch operation in execute whose streamID is
-     * "batch/compute-stream"
+     * do emitBatch operation in execute whose streamID is 
"batch/compute-stream"
      */
     // void execute(Tuple input, IBasicOutputCollector collector);
     /**
@@ -44,8 +43,7 @@ public interface IBatchSpout extends IBasicBolt, ICommitter, 
Serializable {
     /**
      * begin to revert batchId's data
      * 
-     * If current task fails to commit batchId, it won't call revert(batchId) 
If
-     * current task fails to revert batchId, JStorm won't call revert again.
+     * If current task fails to commit batchId, it won't call revert(batchId) 
If current task fails to revert batchId, JStorm won't call revert again.
      * 
      * if not transaction, it can don't care revert
      * 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
index 16f10da..83845ae 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/ICommitter.java
@@ -29,8 +29,7 @@ import backtype.storm.topology.FailedException;
  */
 public interface ICommitter extends Serializable {
     /**
-     * begin to commit batchId's data, then return the commit result The
-     * commitResult will store into outside storage
+     * begin to commit batchId's data, then return the commit result The 
commitResult will store into outside storage
      * 
      * if failed to commit, please throw FailedException
      * 
@@ -43,8 +42,7 @@ public interface ICommitter extends Serializable {
     /**
      * begin to revert batchId's data
      * 
-     * If current task fails to commit batchId, it won't call revert(batchId) 
If
-     * current task fails to revert batchId, JStorm won't call revert again.
+     * If current task fails to commit batchId, it won't call revert(batchId) 
If current task fails to revert batchId, JStorm won't call revert again.
      * 
      * @param id
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
index aa75f9e..e03d58f 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/IPrepareCommit.java
@@ -33,6 +33,5 @@ public interface IPrepareCommit {
      * @param id
      * @param collector
      */
-    void prepareCommit(BatchId id, BasicOutputCollector collector)
-            throws FailedException;
+    void prepareCommit(BatchId id, BasicOutputCollector collector) throws 
FailedException;
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
index 99b1915..d76a8d7 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutMsgId.java
@@ -59,7 +59,6 @@ public class BatchSpoutMsgId implements Serializable {
 
     @Override
     public String toString() {
-        return ToStringBuilder.reflectionToString(this,
-                ToStringStyle.SHORT_PREFIX_STYLE);
+        return ToStringBuilder.reflectionToString(this, 
ToStringStyle.SHORT_PREFIX_STYLE);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
index c1cdae4..edb882b 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/BatchSpoutTrigger.java
@@ -53,8 +53,7 @@ public class BatchSpoutTrigger implements IRichSpout {
     /**  */
     private static final long serialVersionUID = 7215109169247425954L;
 
-    private static final Logger LOG = LoggerFactory
-            .getLogger(BatchSpoutTrigger.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchSpoutTrigger.class);
 
     private LinkedBlockingQueue<BatchSpoutMsgId> batchQueue;
 
@@ -95,9 +94,7 @@ public class BatchSpoutTrigger implements IRichSpout {
             BatchId.updateId(zkMsgId);
         }
 
-        int max_spout_pending =
-                JStormUtils.parseInt(
-                        conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1);
+        int max_spout_pending = 
JStormUtils.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 1);
 
         for (int i = 0; i < max_spout_pending; i++) {
             BatchSpoutMsgId msgId = BatchSpoutMsgId.mkInstance();
@@ -111,8 +108,7 @@ public class BatchSpoutTrigger implements IRichSpout {
     }
 
     @Override
-    public void open(Map conf, TopologyContext context,
-            SpoutOutputCollector collector) {
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector 
collector) {
         batchQueue = new LinkedBlockingQueue<BatchSpoutMsgId>();
         this.collector = collector;
         this.conf = conf;
@@ -134,7 +130,7 @@ public class BatchSpoutTrigger implements IRichSpout {
 
     @Override
     public void close() {
-       zkClient.close();
+        zkClient.close();
     }
 
     @Override
@@ -204,19 +200,16 @@ public class BatchSpoutTrigger implements IRichSpout {
 
             batchQueue.offer(msgId);
             if (intervalCheck.check()) {
-                LOG.info("Current msgId " + msgId
-                        + ", but current commit BatchId is " + currentBatchId);
+                LOG.info("Current msgId " + msgId + ", but current commit 
BatchId is " + currentBatchId);
             } else {
-                LOG.debug("Current msgId " + msgId
-                        + ", but current commit BatchId is " + currentBatchId);
+                LOG.debug("Current msgId " + msgId + ", but current commit 
BatchId is " + currentBatchId);
             }
 
             return;
         }
 
         String streamId = getStreamId(msgId.getBatchStatus());
-        List<Integer> outTasks =
-                collector.emit(streamId, new Values(msgId.getBatchId()), 
msgId);
+        List<Integer> outTasks = collector.emit(streamId, new 
Values(msgId.getBatchId()), msgId);
         if (outTasks.isEmpty()) {
             forward(msgId);
         }
@@ -278,8 +271,7 @@ public class BatchSpoutTrigger implements IRichSpout {
             forward((BatchSpoutMsgId) msgId);
             return;
         } else {
-            LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":"
-                    + msgId);
+            LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" 
+ msgId);
             return;
         }
     }
@@ -306,18 +298,15 @@ public class BatchSpoutTrigger implements IRichSpout {
         if (msgId instanceof BatchSpoutMsgId) {
             handleFail((BatchSpoutMsgId) msgId);
         } else {
-            LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":"
-                    + msgId);
+            LOG.warn("Unknown type msgId " + msgId.getClass().getName() + ":" 
+ msgId);
             return;
         }
     }
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new Fields(
-                "BatchId"));
-        declarer.declareStream(BatchDef.PREPARE_STREAM_ID,
-                new Fields("BatchId"));
+        declarer.declareStream(BatchDef.COMPUTING_STREAM_ID, new 
Fields("BatchId"));
+        declarer.declareStream(BatchDef.PREPARE_STREAM_ID, new 
Fields("BatchId"));
         declarer.declareStream(BatchDef.COMMIT_STREAM_ID, new 
Fields("BatchId"));
         declarer.declareStream(BatchDef.REVERT_STREAM_ID, new 
Fields("BatchId"));
         declarer.declareStream(BatchDef.POST_STREAM_ID, new Fields("BatchId"));

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
index c9bf0b5..63ca6a0 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/impl/CoordinatedBolt.java
@@ -75,23 +75,20 @@ public class CoordinatedBolt implements IRichBolt {
         try {
             zkClient = BatchCommon.getZkClient(conf);
 
-            zkCommitPath =
-                    BatchDef.ZK_COMMIT_DIR + BatchDef.ZK_SEPERATOR + taskId;
+            zkCommitPath = BatchDef.ZK_COMMIT_DIR + BatchDef.ZK_SEPERATOR + 
taskId;
             if (zkClient.node_existed(zkCommitPath, false)) {
                 zkClient.delete_node(zkCommitPath);
             }
             zkClient.mkdirs(zkCommitPath);
 
-            LOG.info(taskName + " successfully create commit path"
-                    + zkCommitPath);
+            LOG.info(taskName + " successfully create commit path" + 
zkCommitPath);
         } catch (Exception e) {
             LOG.error("Failed to create zk node", e);
             throw new RuntimeException();
         }
     }
 
-    public void prepare(Map conf, TopologyContext context,
-            OutputCollector collector) {
+    public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
 
         taskId = String.valueOf(context.getThisTaskId());
         taskName = context.getThisComponentId() + "_" + 
context.getThisTaskId();
@@ -101,9 +98,7 @@ public class CoordinatedBolt implements IRichBolt {
 
         if (delegate instanceof ICommitter) {
             isCommiter = true;
-            commited =
-                    new TimeCacheMap<Object, Object>(
-                            context.maxTopologyMessageTimeout());
+            commited = new TimeCacheMap<Object, 
Object>(context.maxTopologyMessageTimeout());
             mkCommitDir(conf);
         }
 
@@ -130,8 +125,7 @@ public class CoordinatedBolt implements IRichBolt {
         });
 
         for (int index = 0; index < childs.size() - reserveSize; index++) {
-            zkClient.delete_node(path + BatchDef.ZK_SEPERATOR
-                    + childs.get(index));
+            zkClient.delete_node(path + BatchDef.ZK_SEPERATOR + 
childs.get(index));
         }
     }
 
@@ -263,8 +257,7 @@ public class CoordinatedBolt implements IRichBolt {
         } else if (batchStatus == BatchStatus.POST_COMMIT) {
             handlePostCommit(tuple);
         } else {
-            throw new RuntimeException(
-                    "Receive commit tuple, but not committer");
+            throw new RuntimeException("Receive commit tuple, but not 
committer");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
index fcc54fa..bee87ef 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/batch/util/BatchCommon.java
@@ -31,8 +31,7 @@ import com.alibaba.jstorm.cluster.DistributedClusterState;
 import com.alibaba.jstorm.utils.JStormUtils;
 
 public class BatchCommon {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(BatchCommon.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(BatchCommon.class);
 
     private static ClusterState zkClient = null;
 
@@ -44,26 +43,18 @@ public class BatchCommon {
 
             List<String> zkServers = null;
             if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS) != null) {
-                zkServers =
-                        (List<String>) conf
-                                .get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS);
+                zkServers = (List<String>) 
conf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS);
             } else if (conf.get(Config.STORM_ZOOKEEPER_SERVERS) != null) {
-                zkServers =
-                        (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+                zkServers = (List<String>) 
conf.get(Config.STORM_ZOOKEEPER_SERVERS);
             } else {
                 throw new RuntimeException("No setting zk");
             }
 
             int port = 2181;
             if (conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT) != null) {
-                port =
-                        JStormUtils.parseInt(
-                                conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT),
-                                2181);
+                port = 
JStormUtils.parseInt(conf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT), 2181);
             } else if (conf.get(Config.STORM_ZOOKEEPER_PORT) != null) {
-                port =
-                        JStormUtils.parseInt(
-                                conf.get(Config.STORM_ZOOKEEPER_PORT), 2181);
+                port = 
JStormUtils.parseInt(conf.get(Config.STORM_ZOOKEEPER_PORT), 2181);
             }
 
             String root = BatchDef.BATCH_ZK_ROOT;
@@ -71,9 +62,7 @@ public class BatchCommon {
                 root = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
             }
 
-            root =
-                    root + BatchDef.ZK_SEPERATOR
-                            + conf.get(Config.TOPOLOGY_NAME);
+            root = root + BatchDef.ZK_SEPERATOR + 
conf.get(Config.TOPOLOGY_NAME);
 
             Map<Object, Object> tmpConf = new HashMap<Object, Object>();
             tmpConf.putAll(conf);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java
index a5a6835..37653eb 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/JStormCache.java
@@ -23,27 +23,26 @@ import java.util.Map;
 
 import com.alibaba.jstorm.client.ConfigExtension;
 
-
-
 public interface JStormCache extends Serializable {
     public static final String TAG_TIMEOUT_LIST = 
ConfigExtension.CACHE_TIMEOUT_LIST;
-    
-    void init(Map<Object, Object> conf)throws Exception;
+
+    void init(Map<Object, Object> conf) throws Exception;
+
     void cleanup();
-    
-    Object get(String key) ;
-    
+
+    Object get(String key);
+
     void getBatch(Map<String, Object> map);
 
     void remove(String key);
-    
+
     void removeBatch(Collection<String> keys);
 
     void put(String key, Object value, int timeoutSecond);
 
     void put(String key, Object value);
-    
-    void putBatch(Map<String, Object> map) ;
-    
+
+    void putBatch(Map<String, Object> map);
+
     void putBatch(Map<String, Object> map, int timeoutSeconds);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java
index e72e3d0..845ed1e 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksDBCache.java
@@ -17,101 +17,85 @@
  */
 package com.alibaba.jstorm.cache;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.commons.lang.StringUtils;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.WriteBatch;
-import org.rocksdb.WriteOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.PathUtils;
+import org.apache.commons.lang.StringUtils;
+import org.rocksdb.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.Map.Entry;
 
 public class RocksDBCache implements JStormCache {
     private static final long serialVersionUID = 705938812240167583L;
     private static Logger LOG = LoggerFactory.getLogger(RocksDBCache.class);
-    
+
     static {
         RocksDB.loadLibrary();
     }
-    
+
     public static final String ROCKSDB_ROOT_DIR = "rocksdb.root.dir";
     public static final String ROCKSDB_RESET = "rocksdb.reset";
     protected RocksDB db;
     protected String rootDir;
-    
+
     public void initDir(Map<Object, Object> conf) {
         String confDir = (String) conf.get(ROCKSDB_ROOT_DIR);
         if (StringUtils.isBlank(confDir) == true) {
             throw new RuntimeException("Doesn't set rootDir of rocksDB");
         }
-        
-        boolean clean = ConfigExtension.getNimbusCacheReset(conf);
+
+        boolean clean = (Boolean) conf.get(ROCKSDB_RESET);
         LOG.info("RocksDB reset is " + clean);
         if (clean == true) {
             try {
                 PathUtils.rmr(confDir);
             } catch (IOException e) {
-                // TODO Auto-generated catch block
                 throw new RuntimeException("Failed to cleanup rooDir of 
rocksDB " + confDir);
             }
         }
-        
+
         File file = new File(confDir);
         if (file.exists() == false) {
             try {
                 PathUtils.local_mkdirs(confDir);
                 file = new File(confDir);
             } catch (IOException e) {
-                // TODO Auto-generated catch block
                 throw new RuntimeException("Failed to mkdir rooDir of rocksDB 
" + confDir);
             }
         }
-        
+
         rootDir = file.getAbsolutePath();
     }
-    
+
     public void initDb(List<Integer> list) throws Exception {
         LOG.info("Begin to init rocksDB of {}", rootDir);
-        
+
         Options dbOptions = null;
-        
         try {
             dbOptions = new 
Options().setCreateMissingColumnFamilies(true).setCreateIfMissing(true);
-            
+
             List<ColumnFamilyHandle> columnFamilyHandleList = new 
ArrayList<ColumnFamilyHandle>();
-            
+
             db = RocksDB.open(dbOptions, rootDir);
-            
+
             LOG.info("Successfully init rocksDB of {}", rootDir);
         } finally {
-            
             if (dbOptions != null) {
                 dbOptions.dispose();
             }
         }
     }
-    
+
     @Override
     public void init(Map<Object, Object> conf) throws Exception {
-        // TODO Auto-generated method stub
         initDir(conf);
-        
+
         List<Integer> list = new ArrayList<Integer>();
         if (conf.get(TAG_TIMEOUT_LIST) != null) {
             for (Object obj : (List) 
ConfigExtension.getCacheTimeoutList(conf)) {
@@ -119,11 +103,11 @@ public class RocksDBCache implements JStormCache {
                 if (timeoutSecond == null || timeoutSecond <= 0) {
                     continue;
                 }
-                
+
                 list.add(timeoutSecond);
             }
         }
-        
+
         // Add retry logic
         boolean isSuccess = false;
         for (int i = 0; i < 3; i++) {
@@ -135,32 +119,29 @@ public class RocksDBCache implements JStormCache {
                 LOG.warn("Failed to init rocksDB " + rootDir, e);
                 try {
                     PathUtils.rmr(rootDir);
-                } catch (IOException e1) {
-                    // TODO Auto-generated catch block
-                    
+                } catch (IOException ignored) {
                 }
             }
         }
-        
+
         if (isSuccess == false) {
             throw new RuntimeException("Failed to init rocksDB " + rootDir);
         }
     }
-    
+
     @Override
     public void cleanup() {
         LOG.info("Begin to close rocketDb of {}", rootDir);
-        
+
         if (db != null) {
             db.close();
         }
-        
+
         LOG.info("Successfully closed rocketDb of {}", rootDir);
     }
-    
+
     @Override
     public Object get(String key) {
-        // TODO Auto-generated method stub
         try {
             byte[] data = db.get(key.getBytes());
             if (data != null) {
@@ -172,36 +153,35 @@ public class RocksDBCache implements JStormCache {
                     return null;
                 }
             }
-            
+
         } catch (Exception e) {
-            
         }
-        
+
         return null;
     }
-    
+
     @Override
     public void getBatch(Map<String, Object> map) {
         List<byte[]> lookupKeys = new ArrayList<byte[]>();
         for (String key : map.keySet()) {
             lookupKeys.add(key.getBytes());
         }
-        
+
         try {
             Map<byte[], byte[]> results = db.multiGet(lookupKeys);
             if (results == null || results.size() == 0) {
                 return;
             }
-            
+
             for (Entry<byte[], byte[]> resultEntry : results.entrySet()) {
                 byte[] keyByte = resultEntry.getKey();
                 byte[] valueByte = resultEntry.getValue();
-                
+
                 if (keyByte == null || valueByte == null) {
                     continue;
                 }
-                
-                Object value = null;
+
+                Object value;
                 try {
                     value = Utils.javaDeserialize(valueByte);
                 } catch (Exception e) {
@@ -209,45 +189,37 @@ public class RocksDBCache implements JStormCache {
                     db.remove(keyByte);
                     continue;
                 }
-                
+
                 map.put(new String(keyByte), value);
             }
-            
-            return;
         } catch (Exception e) {
             LOG.error("Failed to query " + map.keySet() + ", in window: ");
         }
-        
-        return;
     }
-    
+
     @Override
     public void remove(String key) {
         try {
             db.remove(key.getBytes());
-            
+
         } catch (Exception e) {
             LOG.error("Failed to remove " + key);
         }
-        
+
     }
-    
+
     @Override
     public void removeBatch(Collection<String> keys) {
-        // TODO Auto-generated method stub
         for (String key : keys) {
             remove(key);
         }
     }
-    
+
     @Override
     public void put(String key, Object value, int timeoutSecond) {
-        // TODO Auto-generated method stub
-        
         put(key, value);
-        
     }
-    
+
     @Override
     public void put(String key, Object value) {
         byte[] data = Utils.javaSerialize(value);
@@ -255,38 +227,36 @@ public class RocksDBCache implements JStormCache {
             db.put(key.getBytes(), data);
         } catch (Exception e) {
             LOG.error("Failed put into cache, " + key, e);
-            return;
         }
     }
-    
+
     @Override
     public void putBatch(Map<String, Object> map) {
-        // TODO Auto-generated method stub
         WriteOptions writeOpts = null;
         WriteBatch writeBatch = null;
-        
+
         Set<byte[]> putKeys = new HashSet<byte[]>();
-        
+
         try {
             writeOpts = new WriteOptions();
             writeBatch = new WriteBatch();
-            
+
             for (Entry<String, Object> entry : map.entrySet()) {
                 String key = entry.getKey();
                 Object value = entry.getValue();
-                
+
                 byte[] data = Utils.javaSerialize(value);
-                
+
                 if (StringUtils.isBlank(key) || data == null || data.length == 
0) {
                     continue;
                 }
-                
+
                 byte[] keyByte = key.getBytes();
                 writeBatch.put(keyByte, data);
-                
+
                 putKeys.add(keyByte);
             }
-            
+
             db.write(writeOpts, writeBatch);
         } catch (Exception e) {
             LOG.error("Failed to putBatch into DB, " + map.keySet(), e);
@@ -294,18 +264,16 @@ public class RocksDBCache implements JStormCache {
             if (writeOpts != null) {
                 writeOpts.dispose();
             }
-            
+
             if (writeBatch != null) {
                 writeBatch.dispose();
             }
         }
-        
+
     }
-    
+
     @Override
     public void putBatch(Map<String, Object> map, int timeoutSeconds) {
-        // TODO Auto-generated method stub
         putBatch(map);
     }
-    
 }

Reply via email to