http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java
index 20b5f46..426e074 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java
@@ -48,23 +48,23 @@ import com.alibaba.jstorm.utils.PathUtils;
 public class RocksTTLDBCache implements JStormCache {
     private static final long serialVersionUID = 705938812240167583L;
     private static Logger LOG = LoggerFactory.getLogger(RocksTTLDBCache.class);
-    
+
     static {
         RocksDB.loadLibrary();
     }
-    
+
     public static final String ROCKSDB_ROOT_DIR = "rocksdb.root.dir";
     public static final String ROCKSDB_RESET = "rocksdb.reset";
     protected TtlDB ttlDB;
     protected String rootDir;
     protected TreeMap<Integer, ColumnFamilyHandle> windowHandlers = new 
TreeMap<Integer, ColumnFamilyHandle>();
-    
+
     public void initDir(Map<Object, Object> conf) {
         String confDir = (String) conf.get(ROCKSDB_ROOT_DIR);
         if (StringUtils.isBlank(confDir) == true) {
             throw new RuntimeException("Doesn't set rootDir of rocksDB");
         }
-        
+
         boolean clean = ConfigExtension.getNimbusCacheReset(conf);
         LOG.info("RocksDB reset is " + clean);
         if (clean == true) {
@@ -75,7 +75,7 @@ public class RocksTTLDBCache implements JStormCache {
                 throw new RuntimeException("Failed to cleanup rooDir of 
rocksDB " + confDir);
             }
         }
-        
+
         File file = new File(confDir);
         if (file.exists() == false) {
             try {
@@ -86,53 +86,53 @@ public class RocksTTLDBCache implements JStormCache {
                 throw new RuntimeException("Failed to mkdir rooDir of rocksDB 
" + confDir);
             }
         }
-        
+
         rootDir = file.getAbsolutePath();
     }
-    
-    public void initDb(List<Integer> list) throws Exception{
+
+    public void initDb(List<Integer> list) throws Exception {
         LOG.info("Begin to init rocksDB of {}", rootDir);
-        
+
         DBOptions dbOptions = null;
-        
+
         List<ColumnFamilyDescriptor> columnFamilyNames = new 
ArrayList<ColumnFamilyDescriptor>();
         columnFamilyNames.add(new 
ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
         for (Integer timeout : list) {
             columnFamilyNames.add(new 
ColumnFamilyDescriptor(String.valueOf(timeout).getBytes()));
         }
-        
+
         List<Integer> ttlValues = new ArrayList<Integer>();
         // Default column family with infinite lifetime
         // ATTENSION, the first must be 0, RocksDB.java API has this limitation
         ttlValues.add(0);
         // new column family with list second ttl
         ttlValues.addAll(list);
-        
+
         try {
             dbOptions = new 
DBOptions().setCreateMissingColumnFamilies(true).setCreateIfMissing(true);
-            
+
             List<ColumnFamilyHandle> columnFamilyHandleList = new 
ArrayList<ColumnFamilyHandle>();
-            
+
             ttlDB = TtlDB.open(dbOptions, rootDir, columnFamilyNames, 
columnFamilyHandleList, ttlValues, false);
-            
+
             for (int i = 0; i < ttlValues.size(); i++) {
                 windowHandlers.put(ttlValues.get(i), 
columnFamilyHandleList.get(i));
             }
-            
+
             LOG.info("Successfully init rocksDB of {}", rootDir);
-        }  finally {
-            
+        } finally {
+
             if (dbOptions != null) {
                 dbOptions.dispose();
             }
         }
     }
-    
+
     @Override
-    public void init(Map<Object, Object> conf) throws Exception{
+    public void init(Map<Object, Object> conf) throws Exception {
         // TODO Auto-generated method stub
         initDir(conf);
-        
+
         List<Integer> list = new ArrayList<Integer>();
         if (conf.get(TAG_TIMEOUT_LIST) != null) {
             for (Object obj : (List) 
ConfigExtension.getCacheTimeoutList(conf)) {
@@ -140,11 +140,11 @@ public class RocksTTLDBCache implements JStormCache {
                 if (timeoutSecond == null || timeoutSecond <= 0) {
                     continue;
                 }
-                
+
                 list.add(timeoutSecond);
             }
         }
-        
+
         // Add retry logic
         boolean isSuccess = false;
         for (int i = 0; i < 3; i++) {
@@ -152,64 +152,61 @@ public class RocksTTLDBCache implements JStormCache {
                 initDb(list);
                 isSuccess = true;
                 break;
-            }catch(Exception e) {
+            } catch (Exception e) {
                 LOG.warn("Failed to init rocksDB " + rootDir, e);
                 try {
                     PathUtils.rmr(rootDir);
                 } catch (IOException e1) {
                     // TODO Auto-generated catch block
-                    
+
                 }
             }
         }
-        
+
         if (isSuccess == false) {
             throw new RuntimeException("Failed to init rocksDB " + rootDir);
         }
     }
-    
+
     @Override
     public void cleanup() {
         LOG.info("Begin to close rocketDb of {}", rootDir);
-        
+
         for (ColumnFamilyHandle columnFamilyHandle : windowHandlers.values()) {
             columnFamilyHandle.dispose();
         }
-        
+
         if (ttlDB != null) {
             ttlDB.close();
         }
-        
+
         LOG.info("Successfully closed rocketDb of {}", rootDir);
     }
-    
+
     @Override
-    public Object get(String key)  {
+    public Object get(String key) {
         // TODO Auto-generated method stub
         for (Entry<Integer, ColumnFamilyHandle> entry : 
windowHandlers.entrySet()) {
             try {
-                byte[] data = ttlDB.get(entry.getValue(),
-                            key.getBytes());
+                byte[] data = ttlDB.get(entry.getValue(), key.getBytes());
                 if (data != null) {
                     try {
                         return Utils.javaDeserialize(data);
-                    }catch(Exception e) {
+                    } catch (Exception e) {
                         LOG.error("Failed to deserialize obj of " + key);
-                        ttlDB.remove(entry.getValue(),
-                                        key.getBytes());
+                        ttlDB.remove(entry.getValue(), key.getBytes());
                         return null;
                     }
                 }
-                
-                
-            }catch(Exception e) {
-                
+
+            } catch (Exception e) {
+
             }
         }
-        
+
         return null;
     }
-    
+
     @Override
     public void getBatch(Map<String, Object> map) {
         List<byte[]> lookupKeys = new ArrayList<byte[]>();
@@ -217,26 +214,26 @@ public class RocksTTLDBCache implements JStormCache {
             lookupKeys.add(key.getBytes());
         }
         for (Entry<Integer, ColumnFamilyHandle> entry : 
windowHandlers.entrySet()) {
-            
+
             List<ColumnFamilyHandle> cfHandlers = new 
ArrayList<ColumnFamilyHandle>();
             for (String key : map.keySet()) {
                 cfHandlers.add(entry.getValue());
             }
-            
+
             try {
                 Map<byte[], byte[]> results = ttlDB.multiGet(cfHandlers, 
lookupKeys);
                 if (results == null || results.size() == 0) {
                     continue;
                 }
-                
+
                 for (Entry<byte[], byte[]> resultEntry : results.entrySet()) {
                     byte[] keyByte = resultEntry.getKey();
                     byte[] valueByte = resultEntry.getValue();
-                    
+
                     if (keyByte == null || valueByte == null) {
                         continue;
                     }
-                    
+
                     Object value = null;
                     try {
                         value = Utils.javaDeserialize(valueByte);
@@ -245,35 +242,31 @@ public class RocksTTLDBCache implements JStormCache {
                         ttlDB.remove(entry.getValue(), keyByte);
                         continue;
                     }
-                    
+
                     map.put(new String(keyByte), value);
                 }
-                
-                return ;
+
+                return;
             } catch (Exception e) {
                 LOG.error("Failed to query " + map.keySet() + ", in window: " 
+ entry.getKey());
             }
         }
-        
+
         return;
     }
-    
-    
+
     @Override
     public void remove(String key) {
         for (Entry<Integer, ColumnFamilyHandle> entry : 
windowHandlers.entrySet()) {
             try {
-                ttlDB.remove(entry.getValue(),
-                            key.getBytes());
-                
-                
-                
-            }catch(Exception e) {
+                ttlDB.remove(entry.getValue(), key.getBytes());
+
+            } catch (Exception e) {
                 LOG.error("Failed to remove " + key);
             }
         }
     }
-    
+
     @Override
     public void removeBatch(Collection<String> keys) {
         // TODO Auto-generated method stub
@@ -281,22 +274,22 @@ public class RocksTTLDBCache implements JStormCache {
             remove(key);
         }
     }
-    
+
     protected void put(String key, Object value, Entry<Integer, 
ColumnFamilyHandle> entry) {
-        
+
         byte[] data = Utils.javaSerialize(value);
         try {
             ttlDB.put(entry.getValue(), key.getBytes(), data);
-        }catch(Exception e) {
+        } catch (Exception e) {
             LOG.error("Failed put into cache, " + key, e);
-            return ;
+            return;
         }
-        
+
         for (Entry<Integer, ColumnFamilyHandle> removeEntry : 
windowHandlers.entrySet()) {
             if (removeEntry.getKey().equals(entry.getKey())) {
                 continue;
             }
-            
+
             try {
                 ttlDB.remove(removeEntry.getValue(), key.getBytes());
             } catch (Exception e) {
@@ -305,72 +298,70 @@ public class RocksTTLDBCache implements JStormCache {
             }
         }
     }
-    
+
     protected Entry<Integer, ColumnFamilyHandle> getHandler(int timeoutSecond) 
{
         ColumnFamilyHandle cfHandler = null;
         Entry<Integer, ColumnFamilyHandle> ceilingEntry = 
windowHandlers.ceilingEntry(timeoutSecond);
         if (ceilingEntry != null) {
             return ceilingEntry;
-        }else {
+        } else {
             return windowHandlers.firstEntry();
         }
     }
-    
+
     @Override
     public void put(String key, Object value, int timeoutSecond) {
         // TODO Auto-generated method stub
-        
-        
+
         put(key, value, getHandler(timeoutSecond));
-        
+
     }
-    
+
     @Override
-    public void put(String key, Object value)  {
+    public void put(String key, Object value) {
         put(key, value, windowHandlers.firstEntry());
     }
-    
-    protected void putBatch(Map<String, Object> map, Entry<Integer, 
ColumnFamilyHandle> putEntry )  {
+
+    protected void putBatch(Map<String, Object> map, Entry<Integer, 
ColumnFamilyHandle> putEntry) {
         // TODO Auto-generated method stub
         WriteOptions writeOpts = null;
         WriteBatch writeBatch = null;
-        
+
         Set<byte[]> putKeys = new HashSet<byte[]>();
-        
+
         try {
             writeOpts = new WriteOptions();
             writeBatch = new WriteBatch();
-            
+
             for (Entry<String, Object> entry : map.entrySet()) {
                 String key = entry.getKey();
                 Object value = entry.getValue();
-                
-                
+
                 byte[] data = Utils.javaSerialize(value);
-                
+
                 if (StringUtils.isBlank(key) || data == null || data.length == 
0) {
                     continue;
                 }
-                
+
                 byte[] keyByte = key.getBytes();
                 writeBatch.put(putEntry.getValue(), keyByte, data);
-                
+
                 putKeys.add(keyByte);
             }
-            
+
             ttlDB.write(writeOpts, writeBatch);
-        }catch(Exception e) {
+        } catch (Exception e) {
             LOG.error("Failed to putBatch into DB, " + map.keySet(), e);
-        }finally {
+        } finally {
             if (writeOpts != null) {
                 writeOpts.dispose();
             }
-            
+
             if (writeBatch != null) {
                 writeBatch.dispose();
             }
         }
-        
+
         for (Entry<Integer, ColumnFamilyHandle> entry : 
windowHandlers.entrySet()) {
             if (entry.getKey().equals(putEntry.getKey())) {
                 continue;
@@ -385,85 +376,85 @@ public class RocksTTLDBCache implements JStormCache {
             }
         }
     }
-    
+
     @Override
-    public void putBatch(Map<String, Object> map)  {
+    public void putBatch(Map<String, Object> map) {
         // TODO Auto-generated method stub
         putBatch(map, windowHandlers.firstEntry());
     }
-    
+
     @Override
     public void putBatch(Map<String, Object> map, int timeoutSeconds) {
         // TODO Auto-generated method stub
         putBatch(map, getHandler(timeoutSeconds));
     }
-    
-//    public void put() throws Exception {
-
-//    }
-//    
-//    public void write() throws Exception {
-//        Options options = null;
-//        WriteBatch wb1 = null;
-//        WriteBatch wb2 = null;
-//        WriteOptions opts = null;
-//        try {
-//            options = new Options().setMergeOperator(new 
StringAppendOperator()).setCreateIfMissing(true);
-//            db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
-//            opts = new WriteOptions();
-//            wb1 = new WriteBatch();
-//            wb1.put("key1".getBytes(), "aa".getBytes());
-//            wb1.merge("key1".getBytes(), "bb".getBytes());
-//            wb2 = new WriteBatch();
-//            wb2.put("key2".getBytes(), "xx".getBytes());
-//            wb2.merge("key2".getBytes(), "yy".getBytes());
-//            db.write(opts, wb1);
-//            db.write(opts, wb2);
-//            
assertThat(db.get("key1".getBytes())).isEqualTo("aa,bb".getBytes());
-//            
assertThat(db.get("key2".getBytes())).isEqualTo("xx,yy".getBytes());
-//        } finally {
-//            if (db != null) {
-//                db.close();
-//            }
-//            if (wb1 != null) {
-//                wb1.dispose();
-//            }
-//            if (wb2 != null) {
-//                wb2.dispose();
-//            }
-//            if (options != null) {
-//                options.dispose();
-//            }
-//            if (opts != null) {
-//                opts.dispose();
-//            }
-//        }
-//    }
-//    
-
-//    
-//    public void remove() throws Exception {
-//        RocksDB db = null;
-//        WriteOptions wOpt;
-//        try {
-//            wOpt = new WriteOptions();
-//            db = RocksDB.open(dbFolder.getRoot().getAbsolutePath());
-//            db.put("key1".getBytes(), "value".getBytes());
-//            db.put("key2".getBytes(), "12345678".getBytes());
-//            
assertThat(db.get("key1".getBytes())).isEqualTo("value".getBytes());
-//            
assertThat(db.get("key2".getBytes())).isEqualTo("12345678".getBytes());
-//            db.remove("key1".getBytes());
-//            db.remove(wOpt, "key2".getBytes());
-//            assertThat(db.get("key1".getBytes())).isNull();
-//            assertThat(db.get("key2".getBytes())).isNull();
-//        } finally {
-//            if (db != null) {
-//                db.close();
-//            }
-//        }
-//    }
-//    
-//    public void ttlDbOpenWithColumnFamilies() throws Exception, 
InterruptedException {
-//        
-//    }
+
+    // public void put() throws Exception {
+
+    // }
+    //
+    // public void write() throws Exception {
+    // Options options = null;
+    // WriteBatch wb1 = null;
+    // WriteBatch wb2 = null;
+    // WriteOptions opts = null;
+    // try {
+    // options = new Options().setMergeOperator(new 
StringAppendOperator()).setCreateIfMissing(true);
+    // db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath());
+    // opts = new WriteOptions();
+    // wb1 = new WriteBatch();
+    // wb1.put("key1".getBytes(), "aa".getBytes());
+    // wb1.merge("key1".getBytes(), "bb".getBytes());
+    // wb2 = new WriteBatch();
+    // wb2.put("key2".getBytes(), "xx".getBytes());
+    // wb2.merge("key2".getBytes(), "yy".getBytes());
+    // db.write(opts, wb1);
+    // db.write(opts, wb2);
+    // assertThat(db.get("key1".getBytes())).isEqualTo("aa,bb".getBytes());
+    // assertThat(db.get("key2".getBytes())).isEqualTo("xx,yy".getBytes());
+    // } finally {
+    // if (db != null) {
+    // db.close();
+    // }
+    // if (wb1 != null) {
+    // wb1.dispose();
+    // }
+    // if (wb2 != null) {
+    // wb2.dispose();
+    // }
+    // if (options != null) {
+    // options.dispose();
+    // }
+    // if (opts != null) {
+    // opts.dispose();
+    // }
+    // }
+    // }
+    //
+
+    //
+    // public void remove() throws Exception {
+    // RocksDB db = null;
+    // WriteOptions wOpt;
+    // try {
+    // wOpt = new WriteOptions();
+    // db = RocksDB.open(dbFolder.getRoot().getAbsolutePath());
+    // db.put("key1".getBytes(), "value".getBytes());
+    // db.put("key2".getBytes(), "12345678".getBytes());
+    // assertThat(db.get("key1".getBytes())).isEqualTo("value".getBytes());
+    // assertThat(db.get("key2".getBytes())).isEqualTo("12345678".getBytes());
+    // db.remove("key1".getBytes());
+    // db.remove(wOpt, "key2".getBytes());
+    // assertThat(db.get("key1".getBytes())).isNull();
+    // assertThat(db.get("key2".getBytes())).isNull();
+    // } finally {
+    // if (db != null) {
+    // db.close();
+    // }
+    // }
+    // }
+    //
+    // public void ttlDbOpenWithColumnFamilies() throws Exception, 
InterruptedException {
+    //
+    // }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java
index d4d9905..8924a81 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java
@@ -17,50 +17,46 @@
  */
 package com.alibaba.jstorm.cache;
 
+import com.alibaba.jstorm.client.ConfigExtension;
+import com.alibaba.jstorm.utils.JStormUtils;
+import com.alibaba.jstorm.utils.TimeCacheMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.client.ConfigExtension;
-import com.alibaba.jstorm.utils.JStormUtils;
-import com.alibaba.jstorm.utils.TimeCacheMap;
-
 public class TimeoutMemCache implements JStormCache {
     private static final long serialVersionUID = 705938812240167583L;
     private static Logger LOG = LoggerFactory.getLogger(TimeoutMemCache.class);
-    
-   
+
     protected int defaultTimeout;
     protected final TreeMap<Integer, TimeCacheMap<String, Object>> 
cacheWindows = new TreeMap<Integer, TimeCacheMap<String, Object>>();
-    
+
     public TimeoutMemCache() {
-        
     }
-    
+
     protected void registerCacheWindow(int timeoutSecond) {
         synchronized (this) {
             if (cacheWindows.get(timeoutSecond) == null) {
                 TimeCacheMap<String, Object> cacheWindow = new 
TimeCacheMap<String, Object>(timeoutSecond);
                 cacheWindows.put(timeoutSecond, cacheWindow);
-                
+
                 LOG.info("Successfully register CacheWindow: " + 
timeoutSecond);
             } else {
                 LOG.info("CacheWindow: " + timeoutSecond + " has been 
registered");
             }
         }
     }
-    
+
     @Override
     public void init(Map<Object, Object> conf) {
-        // TODO Auto-generated method stub
         this.defaultTimeout = ConfigExtension.getDefaultCacheTimeout(conf);
         registerCacheWindow(defaultTimeout);
-        
+
         List<Object> list = (List) ConfigExtension.getCacheTimeoutList(conf);
         if (list != null) {
             for (Object obj : list) {
@@ -68,21 +64,17 @@ public class TimeoutMemCache implements JStormCache {
                 if (timeoutSecond == null) {
                     continue;
                 }
-                
                 registerCacheWindow(timeoutSecond);
             }
         }
     }
-    
+
     @Override
     public void cleanup() {
-        // TODO Auto-generated method stub
-        
     }
-    
+
     @Override
     public Object get(String key) {
-        // TODO Auto-generated method stub
         // @@@ TODO
         // in order to improve performance, it can be query from defaultWindow 
firstly, then others
         for (TimeCacheMap<String, Object> cacheWindow : cacheWindows.values()) 
{
@@ -93,21 +85,17 @@ public class TimeoutMemCache implements JStormCache {
         }
         return null;
     }
-    
+
     @Override
     public void getBatch(Map<String, Object> map) {
-        // TODO Auto-generated method stub
         for (String key : map.keySet()) {
             Object obj = get(key);
             map.put(key, obj);
         }
-        
-        return;
     }
-    
+
     @Override
     public void remove(String key) {
-        // TODO Auto-generated method stub
         for (TimeCacheMap<String, Object> cacheWindow : cacheWindows.values()) 
{
             Object ret = cacheWindow.remove(key);
             if (ret != null) {
@@ -115,64 +103,52 @@ public class TimeoutMemCache implements JStormCache {
             }
         }
     }
-    
+
     @Override
     public void removeBatch(Collection<String> keys) {
-        // TODO Auto-generated method stub
         for (String key : keys) {
             remove(key);
         }
-        
-        return;
     }
-    
+
     @Override
     public void put(String key, Object value, int timeoutSecond) {
-        
-        // TODO Auto-generated method stub
         Entry<Integer, TimeCacheMap<String, Object>> ceilingEntry = 
cacheWindows.ceilingEntry(timeoutSecond);
         if (ceilingEntry == null) {
             put(key, value);
-            return ;
-        }else {
+        } else {
             remove(key);
             ceilingEntry.getValue().put(key, value);
         }
-        
     }
-    
+
     @Override
     public void put(String key, Object value) {
         remove(key);
         TimeCacheMap<String, Object> bestWindow = 
cacheWindows.get(defaultTimeout);
         bestWindow.put(key, value);
     }
-    
+
     @Override
-    public void putBatch(Map<String, Object> map)  {
-        // TODO Auto-generated method stub
+    public void putBatch(Map<String, Object> map) {
         for (Entry<String, Object> entry : map.entrySet()) {
             put(entry.getKey(), entry.getValue());
         }
-        
     }
-    
+
     @Override
     public void putBatch(Map<String, Object> map, int timeoutSeconds) {
-        // TODO Auto-generated method stub
         for (Entry<String, Object> entry : map.entrySet()) {
             put(entry.getKey(), entry.getValue(), timeoutSeconds);
         }
-        
     }
 
-       public int getDefaultTimeout() {
-               return defaultTimeout;
-       }
+    public int getDefaultTimeout() {
+        return defaultTimeout;
+    }
+
+    public void setDefaultTimeout(int defaultTimeout) {
+        this.defaultTimeout = defaultTimeout;
+    }
 
-       public void setDefaultTimeout(int defaultTimeout) {
-               this.defaultTimeout = defaultTimeout;
-       }
-    
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
index d21cc4a..e4466e7 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java
@@ -33,8 +33,7 @@ import com.alibaba.jstorm.utils.JStormUtils;
  * 
  */
 public class AsyncLoopRunnable implements Runnable {
-    private static Logger LOG = LoggerFactory
-            .getLogger(AsyncLoopRunnable.class);
+    private static Logger LOG = 
LoggerFactory.getLogger(AsyncLoopRunnable.class);
 
     // set shutdown as false is to
     private static AtomicBoolean shutdown = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
index ce49c51..2f722b9 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java
@@ -28,16 +28,14 @@ import com.alibaba.jstorm.utils.JStormUtils;
 import com.alibaba.jstorm.utils.SmartThread;
 
 /**
- * Wrapper Timer thread Every several seconds execute afn, if something is run,
- * run kill_fn
+ * Wrapper Timer thread Every several seconds execute afn, if something is 
run, run kill_fn
  * 
  * 
  * @author yannian
  * 
  */
 public class AsyncLoopThread implements SmartThread {
-    private static final Logger LOG = LoggerFactory
-            .getLogger(AsyncLoopThread.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLoopThread.class);
 
     private Thread thread;
 
@@ -47,18 +45,15 @@ public class AsyncLoopThread implements SmartThread {
         this.init(afn, false, Thread.NORM_PRIORITY, true);
     }
 
-    public AsyncLoopThread(RunnableCallback afn, boolean daemon, int priority,
-            boolean start) {
+    public AsyncLoopThread(RunnableCallback afn, boolean daemon, int priority, 
boolean start) {
         this.init(afn, daemon, priority, start);
     }
 
-    public AsyncLoopThread(RunnableCallback afn, boolean daemon,
-            RunnableCallback kill_fn, int priority, boolean start) {
+    public AsyncLoopThread(RunnableCallback afn, boolean daemon, 
RunnableCallback kill_fn, int priority, boolean start) {
         this.init(afn, daemon, kill_fn, priority, start);
     }
 
-    public void init(RunnableCallback afn, boolean daemon, int priority,
-            boolean start) {
+    public void init(RunnableCallback afn, boolean daemon, int priority, 
boolean start) {
         RunnableCallback kill_fn = new AsyncLoopDefaultKill();
         this.init(afn, daemon, kill_fn, priority, start);
     }
@@ -72,8 +67,7 @@ public class AsyncLoopThread implements SmartThread {
      * @param args_fn
      * @param start
      */
-    private void init(RunnableCallback afn, boolean daemon,
-            RunnableCallback kill_fn, int priority, boolean start) {
+    private void init(RunnableCallback afn, boolean daemon, RunnableCallback 
kill_fn, int priority, boolean start) {
         if (kill_fn == null) {
             kill_fn = new AsyncLoopDefaultKill();
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
index 4f1764b..132418f 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java
@@ -33,13 +33,11 @@ import com.alibaba.jstorm.zk.ZkKeeperStates;
  */
 public class DefaultWatcherCallBack implements WatcherCallBack {
 
-    private static Logger LOG = LoggerFactory
-            .getLogger(DefaultWatcherCallBack.class);
+    private static Logger LOG = 
LoggerFactory.getLogger(DefaultWatcherCallBack.class);
 
     @Override
     public void execute(KeeperState state, EventType type, String path) {
-        LOG.info("Zookeeper state update:" + ZkKeeperStates.getStateName(state)
-                + "," + ZkEventTypes.getStateName(type) + "," + path);
+        LOG.info("Zookeeper state update:" + 
ZkKeeperStates.getStateName(state) + "," + ZkEventTypes.getStateName(type) + 
"," + path);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
index 79ef633..5670220 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java
@@ -36,15 +36,13 @@ import com.alibaba.jstorm.utils.JStormUtils;
 /**
  * 
  * 
- * The action when nimbus receive kill command 1. set the topology status as
- * target 2. wait 2 * Timeout seconds later, do removing topology from ZK
+ * The action when nimbus receive kill command 1. set the topology status as 
target 2. wait 2 * Timeout seconds later, do removing topology from ZK
  * 
  * @author Longda
  */
 public class DelayStatusTransitionCallback extends BaseCallback {
 
-    private static Logger LOG = LoggerFactory
-            .getLogger(DelayStatusTransitionCallback.class);
+    private static Logger LOG = 
LoggerFactory.getLogger(DelayStatusTransitionCallback.class);
 
     public static final int DEFAULT_DELAY_SECONDS = 30;
 
@@ -54,8 +52,7 @@ public class DelayStatusTransitionCallback extends 
BaseCallback {
     protected StatusType newType;
     protected StatusType nextAction;
 
-    public DelayStatusTransitionCallback(NimbusData data, String topologyid,
-            StormStatus oldStatus, StatusType newType, StatusType nextAction) {
+    public DelayStatusTransitionCallback(NimbusData data, String topologyid, 
StormStatus oldStatus, StatusType newType, StatusType nextAction) {
         this.data = data;
         this.topologyid = topologyid;
         this.oldStatus = oldStatus;
@@ -73,13 +70,8 @@ public class DelayStatusTransitionCallback extends 
BaseCallback {
             Map<?, ?> map = null;
             try {
 
-                map =
-                        StormConfig.read_nimbus_topology_conf(data.getConf(),
-                                topologyid);
-                delaySecs =
-                        JStormUtils.parseInt(
-                                map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS),
-                                DEFAULT_DELAY_SECONDS);
+                map = StormConfig.read_nimbus_topology_conf(data.getConf(), 
topologyid);
+                delaySecs = 
JStormUtils.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), 
DEFAULT_DELAY_SECONDS);
             } catch (Exception e) {
                 LOG.info("Failed to get topology configuration " + topologyid);
             }
@@ -98,12 +90,9 @@ public class DelayStatusTransitionCallback extends 
BaseCallback {
     @Override
     public <T> Object execute(T... args) {
         int delaySecs = getDelaySeconds(args);
-        LOG.info("Delaying event " + newType + " for " + delaySecs
-                + " secs for " + topologyid);
+        LOG.info("Delaying event " + newType + " for " + delaySecs + " secs 
for " + topologyid);
 
-        data.getScheduExec().schedule(
-                new DelayEventRunnable(data, topologyid, nextAction, args),
-                delaySecs, TimeUnit.SECONDS);
+        data.getScheduExec().schedule(new DelayEventRunnable(data, topologyid, 
nextAction, args), delaySecs, TimeUnit.SECONDS);
 
         return new StormStatus(delaySecs, newType);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java
index 41706b3..5060aec 100644
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java
@@ -17,54 +17,44 @@
  */
 package com.alibaba.jstorm.callback.impl;
 
-import java.util.*;
-import java.util.Map.Entry;
-
-import org.apache.log4j.Logger;
-
 import backtype.storm.Config;
 import backtype.storm.generated.Bolt;
 import backtype.storm.generated.SpoutSpec;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.utils.Utils;
-
 import com.alibaba.jstorm.callback.BaseCallback;
-import com.alibaba.jstorm.client.ConfigExtension;
 import com.alibaba.jstorm.cluster.Common;
 import com.alibaba.jstorm.cluster.StormClusterState;
 import com.alibaba.jstorm.cluster.StormConfig;
 import com.alibaba.jstorm.cluster.StormStatus;
-import com.alibaba.jstorm.daemon.nimbus.NimbusData;
-import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
-import com.alibaba.jstorm.daemon.nimbus.StatusType;
-import com.alibaba.jstorm.daemon.nimbus.TopologyAssign;
-import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent;
+import com.alibaba.jstorm.daemon.nimbus.*;
 import com.alibaba.jstorm.task.TaskInfo;
 import com.alibaba.jstorm.task.TkHbCacheTime;
 import com.alibaba.jstorm.utils.JStormUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.Map.Entry;
 
 /**
  * Do real rebalance action.
  * 
- * After nimbus receive one rebalance command, it will do as following: 1. set
- * topology status as rebalancing 2. delay 2 * timeout seconds 3. do this
- * callback
+ * After nimbus receive one rebalance command, it will do as following: 1. set 
topology status as rebalancing 2. delay 2 * timeout seconds 3. do this callback
  * 
  * @author Xin.Li/Longda
  * 
  */
 public class DoRebalanceTransitionCallback extends BaseCallback {
 
-    private static Logger LOG = Logger
-            .getLogger(DoRebalanceTransitionCallback.class);
+    private static Logger LOG = 
LoggerFactory.getLogger(DoRebalanceTransitionCallback.class);
 
     private NimbusData data;
     private String topologyid;
     private StormStatus oldStatus;
     private Set<Integer> newTasks;
 
-    public DoRebalanceTransitionCallback(NimbusData data, String topologyid,
-            StormStatus status) {
+    public DoRebalanceTransitionCallback(NimbusData data, String topologyid, 
StormStatus status) {
         this.data = data;
         this.topologyid = topologyid;
         this.oldStatus = status;
@@ -87,28 +77,17 @@ public class DoRebalanceTransitionCallback extends 
BaseCallback {
                 Map stormConf = data.getConf();
 
                 // Update topology code
-                Map topoConf =
-                        StormConfig.read_nimbus_topology_conf(stormConf,
-                                topologyid);
-                StormTopology rawOldTopology =
-                        StormConfig.read_nimbus_topology_code(stormConf,
-                                topologyid);
-                StormTopology rawNewTopology =
-                        NimbusUtils.normalizeTopology(conf, rawOldTopology,
-                                true);
+                Map topoConf = 
StormConfig.read_nimbus_topology_conf(stormConf, topologyid);
+                StormTopology rawOldTopology = 
StormConfig.read_nimbus_topology_code(stormConf, topologyid);
+                StormTopology rawNewTopology = 
NimbusUtils.normalizeTopology(conf, rawOldTopology, true);
                 StormTopology sysOldTopology = rawOldTopology.deepCopy();
                 StormTopology sysNewTopology = rawNewTopology.deepCopy();
                 if (conf.get(Config.TOPOLOGY_ACKER_EXECUTORS) != null) {
                     Common.add_acker(topoConf, sysOldTopology);
                     Common.add_acker(conf, sysNewTopology);
-                    int ackerNum =
-                            JStormUtils.parseInt(conf
-                                    .get(Config.TOPOLOGY_ACKER_EXECUTORS));
-                    int oldAckerNum =
-                            JStormUtils.parseInt(topoConf
-                                    .get(Config.TOPOLOGY_ACKER_EXECUTORS));
-                    LOG.info("Update acker from oldAckerNum=" + oldAckerNum
-                            + " to ackerNum=" + ackerNum);
+                    int ackerNum = 
JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
+                    int oldAckerNum = 
JStormUtils.parseInt(topoConf.get(Config.TOPOLOGY_ACKER_EXECUTORS));
+                    LOG.info("Update acker from oldAckerNum=" + oldAckerNum + 
" to ackerNum=" + ackerNum);
                     topoConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, ackerNum);
                     isConfUpdate = true;
                 }
@@ -118,32 +97,25 @@ public class DoRebalanceTransitionCallback extends 
BaseCallback {
                 isSetTaskInfo = true;
 
                 // If everything is OK, write topology code into disk
-                StormConfig.write_nimbus_topology_code(stormConf, topologyid,
-                        Utils.serialize(rawNewTopology));
+                StormConfig.write_nimbus_topology_code(stormConf, topologyid, 
Utils.serialize(rawNewTopology));
 
                 // Update topology conf if worker num has been updated
                 Set<Object> keys = conf.keySet();
-                Integer workerNum =
-                        
JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS));
+                Integer workerNum = 
JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS));
                 if (workerNum != null) {
-                    Integer oldWorkerNum =
-                            JStormUtils.parseInt(topoConf
-                                    .get(Config.TOPOLOGY_WORKERS));
+                    Integer oldWorkerNum = 
JStormUtils.parseInt(topoConf.get(Config.TOPOLOGY_WORKERS));
                     topoConf.put(Config.TOPOLOGY_WORKERS, workerNum);
                     isConfUpdate = true;
 
-                    LOG.info("Update worker num from " + oldWorkerNum + " to "
-                            + workerNum);
+                    LOG.info("Update worker num from " + oldWorkerNum + " to " 
+ workerNum);
                 }
 
                 if (keys.contains(Config.ISOLATION_SCHEDULER_MACHINES)) {
-                    topoConf.put(Config.ISOLATION_SCHEDULER_MACHINES,
-                            conf.get(Config.ISOLATION_SCHEDULER_MACHINES));
+                    topoConf.put(Config.ISOLATION_SCHEDULER_MACHINES, 
conf.get(Config.ISOLATION_SCHEDULER_MACHINES));
                 }
 
                 if (isConfUpdate) {
-                    StormConfig.write_nimbus_topology_conf(stormConf,
-                            topologyid, topoConf);
+                    StormConfig.write_nimbus_topology_conf(stormConf, 
topologyid, topoConf);
                 }
             }
 
@@ -153,85 +125,66 @@ public class DoRebalanceTransitionCallback extends 
BaseCallback {
             event.setScratch(true);
             event.setOldStatus(oldStatus);
             event.setReassign(reassign);
-
+            if (conf != null)
+                event.setScaleTopology(true);
             TopologyAssign.push(event);
+            event.waitFinish();
         } catch (Exception e) {
             LOG.error("do-rebalance error!", e);
             // Rollback the changes on ZK
             if (isSetTaskInfo) {
-                    try {
-                        StormClusterState clusterState =
-                                data.getStormClusterState();
-                        clusterState.remove_task(topologyid, newTasks);
-                    } catch (Exception e1) {
-                        LOG.error(
-                                "Failed to rollback the changes on ZK for 
task-"
-                                        + newTasks, e);
-                    }
+                try {
+                    StormClusterState clusterState = 
data.getStormClusterState();
+                    clusterState.remove_task(topologyid, newTasks);
+                } catch (Exception e1) {
+                    LOG.error("Failed to rollback the changes on ZK for task-" 
+ newTasks, e);
                 }
             }
+        }
 
         DelayStatusTransitionCallback delayCallback =
-                new DelayStatusTransitionCallback(data, topologyid, oldStatus,
-                        StatusType.rebalancing, StatusType.done_rebalance);
+                new DelayStatusTransitionCallback(data, topologyid, oldStatus, 
StatusType.rebalancing, StatusType.done_rebalance);
         return delayCallback.execute();
     }
 
-    private void setTaskInfo(StormTopology oldTopology,
-            StormTopology newTopology) throws Exception {
+    private void setTaskInfo(StormTopology oldTopology, StormTopology 
newTopology) throws Exception {
         StormClusterState clusterState = data.getStormClusterState();
         // Retrieve the max task ID
-        TreeSet<Integer> taskIds =
-                new TreeSet<Integer>(clusterState.task_ids(topologyid));
+        TreeSet<Integer> taskIds = new 
TreeSet<Integer>(clusterState.task_ids(topologyid));
         int cnt = taskIds.descendingIterator().next();
 
         cnt = setBoltInfo(oldTopology, newTopology, cnt, clusterState);
         cnt = setSpoutInfo(oldTopology, newTopology, cnt, clusterState);
     }
 
-    private int setBoltInfo(StormTopology oldTopology,
-            StormTopology newTopology, int cnt, StormClusterState clusterState)
-            throws Exception {
+    private int setBoltInfo(StormTopology oldTopology, StormTopology 
newTopology, int cnt, StormClusterState clusterState) throws Exception {
         Map<String, Bolt> oldBolts = oldTopology.get_bolts();
         Map<String, Bolt> bolts = newTopology.get_bolts();
         for (Entry<String, Bolt> entry : oldBolts.entrySet()) {
             String boltName = entry.getKey();
             Bolt oldBolt = entry.getValue();
             Bolt bolt = bolts.get(boltName);
-            if (oldBolt.get_common().get_parallelism_hint() > bolt.get_common()
-                    .get_parallelism_hint()) {
-                int removedTaskNum =
-                        oldBolt.get_common().get_parallelism_hint()
-                                - bolt.get_common().get_parallelism_hint();
-                TreeSet<Integer> taskIds =
-                        new TreeSet<Integer>(
-                                clusterState.task_ids_by_componentId(
-                                        topologyid, boltName));
-                Iterator<Integer> descendIterator =
-                        taskIds.descendingIterator();
+            if (oldBolt.get_common().get_parallelism_hint() > 
bolt.get_common().get_parallelism_hint()) {
+                int removedTaskNum = 
oldBolt.get_common().get_parallelism_hint() - 
bolt.get_common().get_parallelism_hint();
+                TreeSet<Integer> taskIds = new 
TreeSet<Integer>(clusterState.task_ids_by_componentId(topologyid, boltName));
+                Iterator<Integer> descendIterator = 
taskIds.descendingIterator();
                 while (--removedTaskNum >= 0) {
                     int taskId = descendIterator.next();
                     removeTask(topologyid, taskId, clusterState);
-                    LOG.info("Remove bolt task, taskId=" + taskId + " for "
-                            + boltName);
+                    LOG.info("Remove bolt task, taskId=" + taskId + " for " + 
boltName);
                 }
-            } else if (oldBolt.get_common().get_parallelism_hint() == bolt
-                    .get_common().get_parallelism_hint()) {
+            } else if (oldBolt.get_common().get_parallelism_hint() == 
bolt.get_common().get_parallelism_hint()) {
                 continue;
             } else {
-                int delta =
-                        bolt.get_common().get_parallelism_hint()
-                                - oldBolt.get_common().get_parallelism_hint();
+                int delta = bolt.get_common().get_parallelism_hint() - 
oldBolt.get_common().get_parallelism_hint();
                 Map<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, 
TaskInfo>();
 
                 for (int i = 1; i <= delta; i++) {
                     cnt++;
-                    TaskInfo taskInfo =
-                            new TaskInfo((String) entry.getKey(), "bolt");
+                    TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), 
"bolt");
                     taskInfoMap.put(cnt, taskInfo);
                     newTasks.add(cnt);
-                    LOG.info("Setup new bolt task, taskId=" + cnt + " for "
-                            + boltName);
+                    LOG.info("Setup new bolt task, taskId=" + cnt + " for " + 
boltName);
                 }
                 clusterState.add_task(topologyid, taskInfoMap);
             }
@@ -240,52 +193,35 @@ public class DoRebalanceTransitionCallback extends 
BaseCallback {
         return cnt;
     }
 
-    private int setSpoutInfo(StormTopology oldTopology,
-            StormTopology newTopology, int cnt, StormClusterState clusterState)
-            throws Exception {
+    private int setSpoutInfo(StormTopology oldTopology, StormTopology 
newTopology, int cnt, StormClusterState clusterState) throws Exception {
         Map<String, SpoutSpec> oldSpouts = oldTopology.get_spouts();
         Map<String, SpoutSpec> spouts = newTopology.get_spouts();
         for (Entry<String, SpoutSpec> entry : oldSpouts.entrySet()) {
             String spoutName = entry.getKey();
             SpoutSpec oldSpout = entry.getValue();
             SpoutSpec spout = spouts.get(spoutName);
-            if (oldSpout.get_common().get_parallelism_hint() > spout
-                    .get_common().get_parallelism_hint()) {
-                int removedTaskNum =
-                        oldSpout.get_common().get_parallelism_hint()
-                                - spout.get_common().get_parallelism_hint();
-                TreeSet<Integer> taskIds =
-                        new TreeSet<Integer>(
-                                clusterState.task_ids_by_componentId(
-                                        topologyid, spoutName));
-                Iterator<Integer> descendIterator =
-                        taskIds.descendingIterator();
+            if (oldSpout.get_common().get_parallelism_hint() > 
spout.get_common().get_parallelism_hint()) {
+                int removedTaskNum = 
oldSpout.get_common().get_parallelism_hint() - 
spout.get_common().get_parallelism_hint();
+                TreeSet<Integer> taskIds = new 
TreeSet<Integer>(clusterState.task_ids_by_componentId(topologyid, spoutName));
+                Iterator<Integer> descendIterator = 
taskIds.descendingIterator();
                 while (--removedTaskNum >= 0) {
                     int taskId = descendIterator.next();
                     removeTask(topologyid, taskId, clusterState);
-                    LOG.info("Remove spout task, taskId=" + taskId + " for "
-                            + spoutName);
+                    LOG.info("Remove spout task, taskId=" + taskId + " for " + 
spoutName);
                 }
 
-
-
-            } else if (oldSpout.get_common().get_parallelism_hint() == spout
-                    .get_common().get_parallelism_hint()) {
+            } else if (oldSpout.get_common().get_parallelism_hint() == 
spout.get_common().get_parallelism_hint()) {
                 continue;
             } else {
-                int delta =
-                        spout.get_common().get_parallelism_hint()
-                                - oldSpout.get_common().get_parallelism_hint();
+                int delta = spout.get_common().get_parallelism_hint() - 
oldSpout.get_common().get_parallelism_hint();
                 Map<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, 
TaskInfo>();
 
                 for (int i = 1; i <= delta; i++) {
                     cnt++;
-                    TaskInfo taskInfo =
-                            new TaskInfo((String) entry.getKey(), "spout");
+                    TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), 
"spout");
                     taskInfoMap.put(cnt, taskInfo);
                     newTasks.add(cnt);
-                    LOG.info("Setup new spout task, taskId=" + cnt + " for "
-                            + spoutName);
+                    LOG.info("Setup new spout task, taskId=" + cnt + " for " + 
spoutName);
                 }
                 clusterState.add_task(topologyid, taskInfoMap);
             }
@@ -294,12 +230,10 @@ public class DoRebalanceTransitionCallback extends 
BaseCallback {
         return cnt;
     }
 
-    private void removeTask(String topologyId, int taskId,
-            StormClusterState clusterState) throws Exception {
+    private void removeTask(String topologyId, int taskId, StormClusterState 
clusterState) throws Exception {
         Set<Integer> taskIds = new HashSet<Integer>(taskId);
         clusterState.remove_task(topologyid, taskIds);
-        Map<Integer, TkHbCacheTime> TkHbs =
-                data.getTaskHeartbeatsCache(topologyid, false);
+        Map<Integer, TkHbCacheTime> TkHbs = 
data.getTaskHeartbeatsCache(topologyid, false);
         if (TkHbs != null) {
             TkHbs.remove(taskId);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java
index 4dad890..e169fab 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java
@@ -19,15 +19,14 @@ package com.alibaba.jstorm.callback.impl;
 
 import com.alibaba.jstorm.daemon.nimbus.NimbusData;
 import com.alibaba.jstorm.daemon.nimbus.StatusType;
+import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable;
 
 /**
  * The action when nimbus receive killed command.
- * 
- * 1. change current topology status as killed 2. one TIMEOUT seconds later, do
- * remove action, which remove topology from ZK
+ * <p/>
+ * 1. change current topology status as killed 2. one TIMEOUT seconds later, 
do remove action, which remove topology from ZK
  * 
  * @author Longda
- * 
  */
 public class KillTransitionCallback extends DelayStatusTransitionCallback {
 
@@ -35,4 +34,15 @@ public class KillTransitionCallback extends 
DelayStatusTransitionCallback {
         super(data, topologyid, null, StatusType.killed, StatusType.remove);
     }
 
+    @Override
+    public <T> Object execute(T... args) {
+        TopologyMetricsRunnable.KillTopologyEvent event = new 
TopologyMetricsRunnable.KillTopologyEvent();
+        event.clusterName = this.data.getClusterName();
+        event.topologyId = this.topologyid;
+        event.timestamp = System.currentTimeMillis();
+
+        this.data.getMetricRunnable().pushEvent(event);
+        return super.execute(args);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java
index 1b4841c..ffa2ec8 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java
@@ -24,9 +24,8 @@ import com.alibaba.jstorm.daemon.nimbus.TopologyAssign;
 import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent;
 
 /**
- * 1. every Config.NIMBUS_MONITOR_FREQ_SECS will call MonitorRunnable 2.
- * MonitorRunnable will call NimbusData.transition 3. NimbusData.transition 
will
- * this callback
+ * 1. every Config.NIMBUS_MONITOR_FREQ_SECS will call MonitorRunnable 2. 
MonitorRunnable will call NimbusData.transition 3. NimbusData.transition will 
this
+ * callback
  * 
  * 
  */
@@ -42,8 +41,7 @@ public class ReassignTransitionCallback extends BaseCallback {
         this.oldStatus = null;
     }
 
-    public ReassignTransitionCallback(NimbusData data, String topologyid,
-            StormStatus oldStatus) {
+    public ReassignTransitionCallback(NimbusData data, String topologyid, 
StormStatus oldStatus) {
         this.data = data;
         this.topologyid = topologyid;
         this.oldStatus = oldStatus;
@@ -59,6 +57,7 @@ public class ReassignTransitionCallback extends BaseCallback {
         assignEvent.setOldStatus(oldStatus);
 
         TopologyAssign.push(assignEvent);
+        assignEvent.waitFinish();
 
         return null;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java
index f65f542..476f4f5 100755
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java
@@ -22,21 +22,17 @@ import com.alibaba.jstorm.daemon.nimbus.NimbusData;
 import com.alibaba.jstorm.daemon.nimbus.StatusType;
 
 /**
- * The action when nimbus receive rebalance command. Rebalance command is only
- * valid when current status is active
+ * The action when nimbus receive rebalance command. Rebalance command is only 
valid when current status is active
  * 
- * 1. change current topology status as rebalancing 2. do_rebalance action 
after
- * 2 * TIMEOUT seconds
+ * 1. change current topology status as rebalancing 2. do_rebalance action 
after 2 * TIMEOUT seconds
  * 
  * @author Lixin/Longda
  * 
  */
 public class RebalanceTransitionCallback extends DelayStatusTransitionCallback 
{
 
-    public RebalanceTransitionCallback(NimbusData data, String topologyid,
-            StormStatus status) {
-        super(data, topologyid, status, StatusType.rebalancing,
-                StatusType.do_rebalance);
+    public RebalanceTransitionCallback(NimbusData data, String topologyid, 
StormStatus status) {
+        super(data, topologyid, status, StatusType.rebalancing, 
StatusType.do_rebalance);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java
index 231d8e9..8052c40 100644
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java
@@ -35,8 +35,7 @@ import com.alibaba.jstorm.daemon.nimbus.NimbusUtils;
  */
 public class RemoveTransitionCallback extends BaseCallback {
 
-    private static Logger LOG = LoggerFactory
-            .getLogger(RemoveTransitionCallback.class);
+    private static Logger LOG = 
LoggerFactory.getLogger(RemoveTransitionCallback.class);
 
     private NimbusData data;
     private String topologyid;
@@ -51,13 +50,13 @@ public class RemoveTransitionCallback extends BaseCallback {
         LOG.info("Begin to remove topology: " + topologyid);
         try {
 
-            StormBase stormBase =
-                    data.getStormClusterState().storm_base(topologyid, null);
+            StormBase stormBase = 
data.getStormClusterState().storm_base(topologyid, null);
             if (stormBase == null) {
                 LOG.info("Topology " + topologyid + " has been removed ");
                 return null;
             }
             data.getStormClusterState().remove_storm(topologyid);
+            data.getTasksHeartbeat().remove(topologyid);
             NimbusUtils.removeTopologyTaskTimeout(data, topologyid);
             LOG.info("Successfully removed ZK items topology: " + topologyid);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java
deleted file mode 100644
index ca4e0ee..0000000
--- 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.jstorm.callback.impl;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.callback.BaseCallback;
-import com.alibaba.jstorm.cluster.StormClusterState;
-import com.alibaba.jstorm.cluster.StormConfig;
-import com.alibaba.jstorm.cluster.StormStatus;
-import com.alibaba.jstorm.daemon.nimbus.NimbusData;
-import com.alibaba.jstorm.schedule.Assignment;
-import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
-
-/**
- * Update user configuration
- * 
- * @author Basti.lj
- */
-public class UpdateConfTransitionCallback extends BaseCallback {
-
-    private static Logger LOG = LoggerFactory
-            .getLogger(DelayStatusTransitionCallback.class);
-
-    public static final int DEFAULT_DELAY_SECONDS = 30;
-
-    private NimbusData data;
-    private String topologyId;
-    private StormStatus currentStatus;
-
-    public UpdateConfTransitionCallback(NimbusData data, String topologyId,
-            StormStatus currentStatus) {
-        this.data = data;
-        this.topologyId = topologyId;
-        this.currentStatus = currentStatus;
-    }
-
-    @Override
-    public <T> Object execute(T... args) {
-        StormClusterState clusterState = data.getStormClusterState();
-        try {
-            Map userConf = (Map) args[0];
-            Map topoConf =
-                    StormConfig.read_nimbus_topology_conf(data.getConf(),
-                            topologyId);
-            topoConf.putAll(userConf);
-            StormConfig.write_nimbus_topology_conf(data.getConf(), topologyId, 
topoConf);
-            
-            Assignment assignment =
-                    clusterState.assignment_info(topologyId, null);
-            assignment.setAssignmentType(AssignmentType.Config);
-            assignment.updateTimeStamp();
-            clusterState.set_assignment(topologyId, assignment);
-            LOG.info("Successfully update new config to ZK for " + topologyId);
-        } catch (Exception e) {
-            LOG.error("Failed to update user configuartion.", e);
-        }
-        return currentStatus;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java
----------------------------------------------------------------------
diff --git 
a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java
 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java
new file mode 100644
index 0000000..706b98b
--- /dev/null
+++ 
b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.jstorm.callback.impl;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.jstorm.callback.BaseCallback;
+import com.alibaba.jstorm.cluster.StormClusterState;
+import com.alibaba.jstorm.cluster.StormConfig;
+import com.alibaba.jstorm.cluster.StormStatus;
+import com.alibaba.jstorm.daemon.nimbus.NimbusData;
+import com.alibaba.jstorm.schedule.Assignment;
+import com.alibaba.jstorm.schedule.Assignment.AssignmentType;
+
+/**
+ * Update user configuration
+ * 
+ * @author Basti.lj
+ */
+public class UpdateTopologyTransitionCallback extends BaseCallback {
+
+    private static Logger LOG = 
LoggerFactory.getLogger(DelayStatusTransitionCallback.class);
+
+    public static final int DEFAULT_DELAY_SECONDS = 30;
+
+    private NimbusData data;
+    private String topologyId;
+    private StormStatus currentStatus;
+
+    public UpdateTopologyTransitionCallback(NimbusData data, String 
topologyId, StormStatus currentStatus) {
+        this.data = data;
+        this.topologyId = topologyId;
+        this.currentStatus = currentStatus;
+    }
+
+    @Override
+    public <T> Object execute(T... args) {
+        StormClusterState clusterState = data.getStormClusterState();
+        try {
+            Map userConf = (Map) args[0];
+            Map topoConf = 
StormConfig.read_nimbus_topology_conf(data.getConf(), topologyId);
+            topoConf.putAll(userConf);
+            StormConfig.write_nimbus_topology_conf(data.getConf(), topologyId, 
topoConf);
+
+            Assignment assignment = clusterState.assignment_info(topologyId, 
null);
+            assignment.setAssignmentType(AssignmentType.UpdateTopology);
+            assignment.updateTimeStamp();
+            clusterState.set_assignment(topologyId, assignment);
+            LOG.info("Successfully update topology information to ZK for " + 
topologyId);
+        } catch (Exception e) {
+            LOG.error("Failed to update topology.", e);
+        }
+        return currentStatus;
+    }
+
+}

Reply via email to