http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java index 20b5f46..426e074 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/RocksTTLDBCache.java @@ -48,23 +48,23 @@ import com.alibaba.jstorm.utils.PathUtils; public class RocksTTLDBCache implements JStormCache { private static final long serialVersionUID = 705938812240167583L; private static Logger LOG = LoggerFactory.getLogger(RocksTTLDBCache.class); - + static { RocksDB.loadLibrary(); } - + public static final String ROCKSDB_ROOT_DIR = "rocksdb.root.dir"; public static final String ROCKSDB_RESET = "rocksdb.reset"; protected TtlDB ttlDB; protected String rootDir; protected TreeMap<Integer, ColumnFamilyHandle> windowHandlers = new TreeMap<Integer, ColumnFamilyHandle>(); - + public void initDir(Map<Object, Object> conf) { String confDir = (String) conf.get(ROCKSDB_ROOT_DIR); if (StringUtils.isBlank(confDir) == true) { throw new RuntimeException("Doesn't set rootDir of rocksDB"); } - + boolean clean = ConfigExtension.getNimbusCacheReset(conf); LOG.info("RocksDB reset is " + clean); if (clean == true) { @@ -75,7 +75,7 @@ public class RocksTTLDBCache implements JStormCache { throw new RuntimeException("Failed to cleanup rooDir of rocksDB " + confDir); } } - + File file = new File(confDir); if (file.exists() == false) { try { @@ -86,53 +86,53 @@ public class RocksTTLDBCache implements JStormCache { throw new RuntimeException("Failed to mkdir rooDir of rocksDB " + confDir); } } - + rootDir = file.getAbsolutePath(); } - - public void initDb(List<Integer> list) throws Exception{ + + public void initDb(List<Integer> list) throws Exception { LOG.info("Begin to init rocksDB of {}", rootDir); - + DBOptions dbOptions = null; - + List<ColumnFamilyDescriptor> columnFamilyNames = new ArrayList<ColumnFamilyDescriptor>(); columnFamilyNames.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); for (Integer timeout : list) { columnFamilyNames.add(new ColumnFamilyDescriptor(String.valueOf(timeout).getBytes())); } - + List<Integer> ttlValues = new ArrayList<Integer>(); // Default column family with infinite lifetime // ATTENSION, the first must be 0, RocksDB.java API has this limitation ttlValues.add(0); // new column family with list second ttl ttlValues.addAll(list); - + try { dbOptions = new DBOptions().setCreateMissingColumnFamilies(true).setCreateIfMissing(true); - + List<ColumnFamilyHandle> columnFamilyHandleList = new ArrayList<ColumnFamilyHandle>(); - + ttlDB = TtlDB.open(dbOptions, rootDir, columnFamilyNames, columnFamilyHandleList, ttlValues, false); - + for (int i = 0; i < ttlValues.size(); i++) { windowHandlers.put(ttlValues.get(i), columnFamilyHandleList.get(i)); } - + LOG.info("Successfully init rocksDB of {}", rootDir); - } finally { - + } finally { + if (dbOptions != null) { dbOptions.dispose(); } } } - + @Override - public void init(Map<Object, Object> conf) throws Exception{ + public void init(Map<Object, Object> conf) throws Exception { // TODO Auto-generated method stub initDir(conf); - + List<Integer> list = new ArrayList<Integer>(); if (conf.get(TAG_TIMEOUT_LIST) != null) { for (Object obj : (List) ConfigExtension.getCacheTimeoutList(conf)) { @@ -140,11 +140,11 @@ public class RocksTTLDBCache implements JStormCache { if (timeoutSecond == null || timeoutSecond <= 0) { continue; } - + list.add(timeoutSecond); } } - + // Add retry logic boolean isSuccess = false; for (int i = 0; i < 3; i++) { @@ -152,64 +152,61 @@ public class RocksTTLDBCache implements JStormCache { initDb(list); isSuccess = true; break; - }catch(Exception e) { + } catch (Exception e) { LOG.warn("Failed to init rocksDB " + rootDir, e); try { PathUtils.rmr(rootDir); } catch (IOException e1) { // TODO Auto-generated catch block - + } } } - + if (isSuccess == false) { throw new RuntimeException("Failed to init rocksDB " + rootDir); } } - + @Override public void cleanup() { LOG.info("Begin to close rocketDb of {}", rootDir); - + for (ColumnFamilyHandle columnFamilyHandle : windowHandlers.values()) { columnFamilyHandle.dispose(); } - + if (ttlDB != null) { ttlDB.close(); } - + LOG.info("Successfully closed rocketDb of {}", rootDir); } - + @Override - public Object get(String key) { + public Object get(String key) { // TODO Auto-generated method stub for (Entry<Integer, ColumnFamilyHandle> entry : windowHandlers.entrySet()) { try { - byte[] data = ttlDB.get(entry.getValue(), - key.getBytes()); + byte[] data = ttlDB.get(entry.getValue(), key.getBytes()); if (data != null) { try { return Utils.javaDeserialize(data); - }catch(Exception e) { + } catch (Exception e) { LOG.error("Failed to deserialize obj of " + key); - ttlDB.remove(entry.getValue(), - key.getBytes()); + ttlDB.remove(entry.getValue(), key.getBytes()); return null; } } - - - }catch(Exception e) { - + + } catch (Exception e) { + } } - + return null; } - + @Override public void getBatch(Map<String, Object> map) { List<byte[]> lookupKeys = new ArrayList<byte[]>(); @@ -217,26 +214,26 @@ public class RocksTTLDBCache implements JStormCache { lookupKeys.add(key.getBytes()); } for (Entry<Integer, ColumnFamilyHandle> entry : windowHandlers.entrySet()) { - + List<ColumnFamilyHandle> cfHandlers = new ArrayList<ColumnFamilyHandle>(); for (String key : map.keySet()) { cfHandlers.add(entry.getValue()); } - + try { Map<byte[], byte[]> results = ttlDB.multiGet(cfHandlers, lookupKeys); if (results == null || results.size() == 0) { continue; } - + for (Entry<byte[], byte[]> resultEntry : results.entrySet()) { byte[] keyByte = resultEntry.getKey(); byte[] valueByte = resultEntry.getValue(); - + if (keyByte == null || valueByte == null) { continue; } - + Object value = null; try { value = Utils.javaDeserialize(valueByte); @@ -245,35 +242,31 @@ public class RocksTTLDBCache implements JStormCache { ttlDB.remove(entry.getValue(), keyByte); continue; } - + map.put(new String(keyByte), value); } - - return ; + + return; } catch (Exception e) { LOG.error("Failed to query " + map.keySet() + ", in window: " + entry.getKey()); } } - + return; } - - + @Override public void remove(String key) { for (Entry<Integer, ColumnFamilyHandle> entry : windowHandlers.entrySet()) { try { - ttlDB.remove(entry.getValue(), - key.getBytes()); - - - - }catch(Exception e) { + ttlDB.remove(entry.getValue(), key.getBytes()); + + } catch (Exception e) { LOG.error("Failed to remove " + key); } } } - + @Override public void removeBatch(Collection<String> keys) { // TODO Auto-generated method stub @@ -281,22 +274,22 @@ public class RocksTTLDBCache implements JStormCache { remove(key); } } - + protected void put(String key, Object value, Entry<Integer, ColumnFamilyHandle> entry) { - + byte[] data = Utils.javaSerialize(value); try { ttlDB.put(entry.getValue(), key.getBytes(), data); - }catch(Exception e) { + } catch (Exception e) { LOG.error("Failed put into cache, " + key, e); - return ; + return; } - + for (Entry<Integer, ColumnFamilyHandle> removeEntry : windowHandlers.entrySet()) { if (removeEntry.getKey().equals(entry.getKey())) { continue; } - + try { ttlDB.remove(removeEntry.getValue(), key.getBytes()); } catch (Exception e) { @@ -305,72 +298,70 @@ public class RocksTTLDBCache implements JStormCache { } } } - + protected Entry<Integer, ColumnFamilyHandle> getHandler(int timeoutSecond) { ColumnFamilyHandle cfHandler = null; Entry<Integer, ColumnFamilyHandle> ceilingEntry = windowHandlers.ceilingEntry(timeoutSecond); if (ceilingEntry != null) { return ceilingEntry; - }else { + } else { return windowHandlers.firstEntry(); } } - + @Override public void put(String key, Object value, int timeoutSecond) { // TODO Auto-generated method stub - - + put(key, value, getHandler(timeoutSecond)); - + } - + @Override - public void put(String key, Object value) { + public void put(String key, Object value) { put(key, value, windowHandlers.firstEntry()); } - - protected void putBatch(Map<String, Object> map, Entry<Integer, ColumnFamilyHandle> putEntry ) { + + protected void putBatch(Map<String, Object> map, Entry<Integer, ColumnFamilyHandle> putEntry) { // TODO Auto-generated method stub WriteOptions writeOpts = null; WriteBatch writeBatch = null; - + Set<byte[]> putKeys = new HashSet<byte[]>(); - + try { writeOpts = new WriteOptions(); writeBatch = new WriteBatch(); - + for (Entry<String, Object> entry : map.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); - - + byte[] data = Utils.javaSerialize(value); - + if (StringUtils.isBlank(key) || data == null || data.length == 0) { continue; } - + byte[] keyByte = key.getBytes(); writeBatch.put(putEntry.getValue(), keyByte, data); - + putKeys.add(keyByte); } - + ttlDB.write(writeOpts, writeBatch); - }catch(Exception e) { + } catch (Exception e) { LOG.error("Failed to putBatch into DB, " + map.keySet(), e); - }finally { + } finally { if (writeOpts != null) { writeOpts.dispose(); } - + if (writeBatch != null) { writeBatch.dispose(); } } - + for (Entry<Integer, ColumnFamilyHandle> entry : windowHandlers.entrySet()) { if (entry.getKey().equals(putEntry.getKey())) { continue; @@ -385,85 +376,85 @@ public class RocksTTLDBCache implements JStormCache { } } } - + @Override - public void putBatch(Map<String, Object> map) { + public void putBatch(Map<String, Object> map) { // TODO Auto-generated method stub putBatch(map, windowHandlers.firstEntry()); } - + @Override public void putBatch(Map<String, Object> map, int timeoutSeconds) { // TODO Auto-generated method stub putBatch(map, getHandler(timeoutSeconds)); } - -// public void put() throws Exception { - -// } -// -// public void write() throws Exception { -// Options options = null; -// WriteBatch wb1 = null; -// WriteBatch wb2 = null; -// WriteOptions opts = null; -// try { -// options = new Options().setMergeOperator(new StringAppendOperator()).setCreateIfMissing(true); -// db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); -// opts = new WriteOptions(); -// wb1 = new WriteBatch(); -// wb1.put("key1".getBytes(), "aa".getBytes()); -// wb1.merge("key1".getBytes(), "bb".getBytes()); -// wb2 = new WriteBatch(); -// wb2.put("key2".getBytes(), "xx".getBytes()); -// wb2.merge("key2".getBytes(), "yy".getBytes()); -// db.write(opts, wb1); -// db.write(opts, wb2); -// assertThat(db.get("key1".getBytes())).isEqualTo("aa,bb".getBytes()); -// assertThat(db.get("key2".getBytes())).isEqualTo("xx,yy".getBytes()); -// } finally { -// if (db != null) { -// db.close(); -// } -// if (wb1 != null) { -// wb1.dispose(); -// } -// if (wb2 != null) { -// wb2.dispose(); -// } -// if (options != null) { -// options.dispose(); -// } -// if (opts != null) { -// opts.dispose(); -// } -// } -// } -// - -// -// public void remove() throws Exception { -// RocksDB db = null; -// WriteOptions wOpt; -// try { -// wOpt = new WriteOptions(); -// db = RocksDB.open(dbFolder.getRoot().getAbsolutePath()); -// db.put("key1".getBytes(), "value".getBytes()); -// db.put("key2".getBytes(), "12345678".getBytes()); -// assertThat(db.get("key1".getBytes())).isEqualTo("value".getBytes()); -// assertThat(db.get("key2".getBytes())).isEqualTo("12345678".getBytes()); -// db.remove("key1".getBytes()); -// db.remove(wOpt, "key2".getBytes()); -// assertThat(db.get("key1".getBytes())).isNull(); -// assertThat(db.get("key2".getBytes())).isNull(); -// } finally { -// if (db != null) { -// db.close(); -// } -// } -// } -// -// public void ttlDbOpenWithColumnFamilies() throws Exception, InterruptedException { -// -// } + + // public void put() throws Exception { + + // } + // + // public void write() throws Exception { + // Options options = null; + // WriteBatch wb1 = null; + // WriteBatch wb2 = null; + // WriteOptions opts = null; + // try { + // options = new Options().setMergeOperator(new StringAppendOperator()).setCreateIfMissing(true); + // db = RocksDB.open(options, dbFolder.getRoot().getAbsolutePath()); + // opts = new WriteOptions(); + // wb1 = new WriteBatch(); + // wb1.put("key1".getBytes(), "aa".getBytes()); + // wb1.merge("key1".getBytes(), "bb".getBytes()); + // wb2 = new WriteBatch(); + // wb2.put("key2".getBytes(), "xx".getBytes()); + // wb2.merge("key2".getBytes(), "yy".getBytes()); + // db.write(opts, wb1); + // db.write(opts, wb2); + // assertThat(db.get("key1".getBytes())).isEqualTo("aa,bb".getBytes()); + // assertThat(db.get("key2".getBytes())).isEqualTo("xx,yy".getBytes()); + // } finally { + // if (db != null) { + // db.close(); + // } + // if (wb1 != null) { + // wb1.dispose(); + // } + // if (wb2 != null) { + // wb2.dispose(); + // } + // if (options != null) { + // options.dispose(); + // } + // if (opts != null) { + // opts.dispose(); + // } + // } + // } + // + + // + // public void remove() throws Exception { + // RocksDB db = null; + // WriteOptions wOpt; + // try { + // wOpt = new WriteOptions(); + // db = RocksDB.open(dbFolder.getRoot().getAbsolutePath()); + // db.put("key1".getBytes(), "value".getBytes()); + // db.put("key2".getBytes(), "12345678".getBytes()); + // assertThat(db.get("key1".getBytes())).isEqualTo("value".getBytes()); + // assertThat(db.get("key2".getBytes())).isEqualTo("12345678".getBytes()); + // db.remove("key1".getBytes()); + // db.remove(wOpt, "key2".getBytes()); + // assertThat(db.get("key1".getBytes())).isNull(); + // assertThat(db.get("key2".getBytes())).isNull(); + // } finally { + // if (db != null) { + // db.close(); + // } + // } + // } + // + // public void ttlDbOpenWithColumnFamilies() throws Exception, InterruptedException { + // + // } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java index d4d9905..8924a81 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/cache/TimeoutMemCache.java @@ -17,50 +17,46 @@ */ package com.alibaba.jstorm.cache; +import com.alibaba.jstorm.client.ConfigExtension; +import com.alibaba.jstorm.utils.JStormUtils; +import com.alibaba.jstorm.utils.TimeCacheMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.TreeMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.client.ConfigExtension; -import com.alibaba.jstorm.utils.JStormUtils; -import com.alibaba.jstorm.utils.TimeCacheMap; - public class TimeoutMemCache implements JStormCache { private static final long serialVersionUID = 705938812240167583L; private static Logger LOG = LoggerFactory.getLogger(TimeoutMemCache.class); - - + protected int defaultTimeout; protected final TreeMap<Integer, TimeCacheMap<String, Object>> cacheWindows = new TreeMap<Integer, TimeCacheMap<String, Object>>(); - + public TimeoutMemCache() { - } - + protected void registerCacheWindow(int timeoutSecond) { synchronized (this) { if (cacheWindows.get(timeoutSecond) == null) { TimeCacheMap<String, Object> cacheWindow = new TimeCacheMap<String, Object>(timeoutSecond); cacheWindows.put(timeoutSecond, cacheWindow); - + LOG.info("Successfully register CacheWindow: " + timeoutSecond); } else { LOG.info("CacheWindow: " + timeoutSecond + " has been registered"); } } } - + @Override public void init(Map<Object, Object> conf) { - // TODO Auto-generated method stub this.defaultTimeout = ConfigExtension.getDefaultCacheTimeout(conf); registerCacheWindow(defaultTimeout); - + List<Object> list = (List) ConfigExtension.getCacheTimeoutList(conf); if (list != null) { for (Object obj : list) { @@ -68,21 +64,17 @@ public class TimeoutMemCache implements JStormCache { if (timeoutSecond == null) { continue; } - registerCacheWindow(timeoutSecond); } } } - + @Override public void cleanup() { - // TODO Auto-generated method stub - } - + @Override public Object get(String key) { - // TODO Auto-generated method stub // @@@ TODO // in order to improve performance, it can be query from defaultWindow firstly, then others for (TimeCacheMap<String, Object> cacheWindow : cacheWindows.values()) { @@ -93,21 +85,17 @@ public class TimeoutMemCache implements JStormCache { } return null; } - + @Override public void getBatch(Map<String, Object> map) { - // TODO Auto-generated method stub for (String key : map.keySet()) { Object obj = get(key); map.put(key, obj); } - - return; } - + @Override public void remove(String key) { - // TODO Auto-generated method stub for (TimeCacheMap<String, Object> cacheWindow : cacheWindows.values()) { Object ret = cacheWindow.remove(key); if (ret != null) { @@ -115,64 +103,52 @@ public class TimeoutMemCache implements JStormCache { } } } - + @Override public void removeBatch(Collection<String> keys) { - // TODO Auto-generated method stub for (String key : keys) { remove(key); } - - return; } - + @Override public void put(String key, Object value, int timeoutSecond) { - - // TODO Auto-generated method stub Entry<Integer, TimeCacheMap<String, Object>> ceilingEntry = cacheWindows.ceilingEntry(timeoutSecond); if (ceilingEntry == null) { put(key, value); - return ; - }else { + } else { remove(key); ceilingEntry.getValue().put(key, value); } - } - + @Override public void put(String key, Object value) { remove(key); TimeCacheMap<String, Object> bestWindow = cacheWindows.get(defaultTimeout); bestWindow.put(key, value); } - + @Override - public void putBatch(Map<String, Object> map) { - // TODO Auto-generated method stub + public void putBatch(Map<String, Object> map) { for (Entry<String, Object> entry : map.entrySet()) { put(entry.getKey(), entry.getValue()); } - } - + @Override public void putBatch(Map<String, Object> map, int timeoutSeconds) { - // TODO Auto-generated method stub for (Entry<String, Object> entry : map.entrySet()) { put(entry.getKey(), entry.getValue(), timeoutSeconds); } - } - public int getDefaultTimeout() { - return defaultTimeout; - } + public int getDefaultTimeout() { + return defaultTimeout; + } + + public void setDefaultTimeout(int defaultTimeout) { + this.defaultTimeout = defaultTimeout; + } - public void setDefaultTimeout(int defaultTimeout) { - this.defaultTimeout = defaultTimeout; - } - - } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java index d21cc4a..e4466e7 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopRunnable.java @@ -33,8 +33,7 @@ import com.alibaba.jstorm.utils.JStormUtils; * */ public class AsyncLoopRunnable implements Runnable { - private static Logger LOG = LoggerFactory - .getLogger(AsyncLoopRunnable.class); + private static Logger LOG = LoggerFactory.getLogger(AsyncLoopRunnable.class); // set shutdown as false is to private static AtomicBoolean shutdown = new AtomicBoolean(false); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java index ce49c51..2f722b9 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/AsyncLoopThread.java @@ -28,16 +28,14 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.SmartThread; /** - * Wrapper Timer thread Every several seconds execute afn, if something is run, - * run kill_fn + * Wrapper Timer thread Every several seconds execute afn, if something is run, run kill_fn * * * @author yannian * */ public class AsyncLoopThread implements SmartThread { - private static final Logger LOG = LoggerFactory - .getLogger(AsyncLoopThread.class); + private static final Logger LOG = LoggerFactory.getLogger(AsyncLoopThread.class); private Thread thread; @@ -47,18 +45,15 @@ public class AsyncLoopThread implements SmartThread { this.init(afn, false, Thread.NORM_PRIORITY, true); } - public AsyncLoopThread(RunnableCallback afn, boolean daemon, int priority, - boolean start) { + public AsyncLoopThread(RunnableCallback afn, boolean daemon, int priority, boolean start) { this.init(afn, daemon, priority, start); } - public AsyncLoopThread(RunnableCallback afn, boolean daemon, - RunnableCallback kill_fn, int priority, boolean start) { + public AsyncLoopThread(RunnableCallback afn, boolean daemon, RunnableCallback kill_fn, int priority, boolean start) { this.init(afn, daemon, kill_fn, priority, start); } - public void init(RunnableCallback afn, boolean daemon, int priority, - boolean start) { + public void init(RunnableCallback afn, boolean daemon, int priority, boolean start) { RunnableCallback kill_fn = new AsyncLoopDefaultKill(); this.init(afn, daemon, kill_fn, priority, start); } @@ -72,8 +67,7 @@ public class AsyncLoopThread implements SmartThread { * @param args_fn * @param start */ - private void init(RunnableCallback afn, boolean daemon, - RunnableCallback kill_fn, int priority, boolean start) { + private void init(RunnableCallback afn, boolean daemon, RunnableCallback kill_fn, int priority, boolean start) { if (kill_fn == null) { kill_fn = new AsyncLoopDefaultKill(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java index 4f1764b..132418f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/DefaultWatcherCallBack.java @@ -33,13 +33,11 @@ import com.alibaba.jstorm.zk.ZkKeeperStates; */ public class DefaultWatcherCallBack implements WatcherCallBack { - private static Logger LOG = LoggerFactory - .getLogger(DefaultWatcherCallBack.class); + private static Logger LOG = LoggerFactory.getLogger(DefaultWatcherCallBack.class); @Override public void execute(KeeperState state, EventType type, String path) { - LOG.info("Zookeeper state update:" + ZkKeeperStates.getStateName(state) - + "," + ZkEventTypes.getStateName(type) + "," + path); + LOG.info("Zookeeper state update:" + ZkKeeperStates.getStateName(state) + "," + ZkEventTypes.getStateName(type) + "," + path); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java index 79ef633..5670220 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DelayStatusTransitionCallback.java @@ -36,15 +36,13 @@ import com.alibaba.jstorm.utils.JStormUtils; /** * * - * The action when nimbus receive kill command 1. set the topology status as - * target 2. wait 2 * Timeout seconds later, do removing topology from ZK + * The action when nimbus receive kill command 1. set the topology status as target 2. wait 2 * Timeout seconds later, do removing topology from ZK * * @author Longda */ public class DelayStatusTransitionCallback extends BaseCallback { - private static Logger LOG = LoggerFactory - .getLogger(DelayStatusTransitionCallback.class); + private static Logger LOG = LoggerFactory.getLogger(DelayStatusTransitionCallback.class); public static final int DEFAULT_DELAY_SECONDS = 30; @@ -54,8 +52,7 @@ public class DelayStatusTransitionCallback extends BaseCallback { protected StatusType newType; protected StatusType nextAction; - public DelayStatusTransitionCallback(NimbusData data, String topologyid, - StormStatus oldStatus, StatusType newType, StatusType nextAction) { + public DelayStatusTransitionCallback(NimbusData data, String topologyid, StormStatus oldStatus, StatusType newType, StatusType nextAction) { this.data = data; this.topologyid = topologyid; this.oldStatus = oldStatus; @@ -73,13 +70,8 @@ public class DelayStatusTransitionCallback extends BaseCallback { Map<?, ?> map = null; try { - map = - StormConfig.read_nimbus_topology_conf(data.getConf(), - topologyid); - delaySecs = - JStormUtils.parseInt( - map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), - DEFAULT_DELAY_SECONDS); + map = StormConfig.read_nimbus_topology_conf(data.getConf(), topologyid); + delaySecs = JStormUtils.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS), DEFAULT_DELAY_SECONDS); } catch (Exception e) { LOG.info("Failed to get topology configuration " + topologyid); } @@ -98,12 +90,9 @@ public class DelayStatusTransitionCallback extends BaseCallback { @Override public <T> Object execute(T... args) { int delaySecs = getDelaySeconds(args); - LOG.info("Delaying event " + newType + " for " + delaySecs - + " secs for " + topologyid); + LOG.info("Delaying event " + newType + " for " + delaySecs + " secs for " + topologyid); - data.getScheduExec().schedule( - new DelayEventRunnable(data, topologyid, nextAction, args), - delaySecs, TimeUnit.SECONDS); + data.getScheduExec().schedule(new DelayEventRunnable(data, topologyid, nextAction, args), delaySecs, TimeUnit.SECONDS); return new StormStatus(delaySecs, newType); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java index 41706b3..5060aec 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/DoRebalanceTransitionCallback.java @@ -17,54 +17,44 @@ */ package com.alibaba.jstorm.callback.impl; -import java.util.*; -import java.util.Map.Entry; - -import org.apache.log4j.Logger; - import backtype.storm.Config; import backtype.storm.generated.Bolt; import backtype.storm.generated.SpoutSpec; import backtype.storm.generated.StormTopology; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.callback.BaseCallback; -import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.cluster.StormClusterState; import com.alibaba.jstorm.cluster.StormConfig; import com.alibaba.jstorm.cluster.StormStatus; -import com.alibaba.jstorm.daemon.nimbus.NimbusData; -import com.alibaba.jstorm.daemon.nimbus.NimbusUtils; -import com.alibaba.jstorm.daemon.nimbus.StatusType; -import com.alibaba.jstorm.daemon.nimbus.TopologyAssign; -import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent; +import com.alibaba.jstorm.daemon.nimbus.*; import com.alibaba.jstorm.task.TaskInfo; import com.alibaba.jstorm.task.TkHbCacheTime; import com.alibaba.jstorm.utils.JStormUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.Map.Entry; /** * Do real rebalance action. * - * After nimbus receive one rebalance command, it will do as following: 1. set - * topology status as rebalancing 2. delay 2 * timeout seconds 3. do this - * callback + * After nimbus receive one rebalance command, it will do as following: 1. set topology status as rebalancing 2. delay 2 * timeout seconds 3. do this callback * * @author Xin.Li/Longda * */ public class DoRebalanceTransitionCallback extends BaseCallback { - private static Logger LOG = Logger - .getLogger(DoRebalanceTransitionCallback.class); + private static Logger LOG = LoggerFactory.getLogger(DoRebalanceTransitionCallback.class); private NimbusData data; private String topologyid; private StormStatus oldStatus; private Set<Integer> newTasks; - public DoRebalanceTransitionCallback(NimbusData data, String topologyid, - StormStatus status) { + public DoRebalanceTransitionCallback(NimbusData data, String topologyid, StormStatus status) { this.data = data; this.topologyid = topologyid; this.oldStatus = status; @@ -87,28 +77,17 @@ public class DoRebalanceTransitionCallback extends BaseCallback { Map stormConf = data.getConf(); // Update topology code - Map topoConf = - StormConfig.read_nimbus_topology_conf(stormConf, - topologyid); - StormTopology rawOldTopology = - StormConfig.read_nimbus_topology_code(stormConf, - topologyid); - StormTopology rawNewTopology = - NimbusUtils.normalizeTopology(conf, rawOldTopology, - true); + Map topoConf = StormConfig.read_nimbus_topology_conf(stormConf, topologyid); + StormTopology rawOldTopology = StormConfig.read_nimbus_topology_code(stormConf, topologyid); + StormTopology rawNewTopology = NimbusUtils.normalizeTopology(conf, rawOldTopology, true); StormTopology sysOldTopology = rawOldTopology.deepCopy(); StormTopology sysNewTopology = rawNewTopology.deepCopy(); if (conf.get(Config.TOPOLOGY_ACKER_EXECUTORS) != null) { Common.add_acker(topoConf, sysOldTopology); Common.add_acker(conf, sysNewTopology); - int ackerNum = - JStormUtils.parseInt(conf - .get(Config.TOPOLOGY_ACKER_EXECUTORS)); - int oldAckerNum = - JStormUtils.parseInt(topoConf - .get(Config.TOPOLOGY_ACKER_EXECUTORS)); - LOG.info("Update acker from oldAckerNum=" + oldAckerNum - + " to ackerNum=" + ackerNum); + int ackerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); + int oldAckerNum = JStormUtils.parseInt(topoConf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); + LOG.info("Update acker from oldAckerNum=" + oldAckerNum + " to ackerNum=" + ackerNum); topoConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, ackerNum); isConfUpdate = true; } @@ -118,32 +97,25 @@ public class DoRebalanceTransitionCallback extends BaseCallback { isSetTaskInfo = true; // If everything is OK, write topology code into disk - StormConfig.write_nimbus_topology_code(stormConf, topologyid, - Utils.serialize(rawNewTopology)); + StormConfig.write_nimbus_topology_code(stormConf, topologyid, Utils.serialize(rawNewTopology)); // Update topology conf if worker num has been updated Set<Object> keys = conf.keySet(); - Integer workerNum = - JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)); + Integer workerNum = JStormUtils.parseInt(conf.get(Config.TOPOLOGY_WORKERS)); if (workerNum != null) { - Integer oldWorkerNum = - JStormUtils.parseInt(topoConf - .get(Config.TOPOLOGY_WORKERS)); + Integer oldWorkerNum = JStormUtils.parseInt(topoConf.get(Config.TOPOLOGY_WORKERS)); topoConf.put(Config.TOPOLOGY_WORKERS, workerNum); isConfUpdate = true; - LOG.info("Update worker num from " + oldWorkerNum + " to " - + workerNum); + LOG.info("Update worker num from " + oldWorkerNum + " to " + workerNum); } if (keys.contains(Config.ISOLATION_SCHEDULER_MACHINES)) { - topoConf.put(Config.ISOLATION_SCHEDULER_MACHINES, - conf.get(Config.ISOLATION_SCHEDULER_MACHINES)); + topoConf.put(Config.ISOLATION_SCHEDULER_MACHINES, conf.get(Config.ISOLATION_SCHEDULER_MACHINES)); } if (isConfUpdate) { - StormConfig.write_nimbus_topology_conf(stormConf, - topologyid, topoConf); + StormConfig.write_nimbus_topology_conf(stormConf, topologyid, topoConf); } } @@ -153,85 +125,66 @@ public class DoRebalanceTransitionCallback extends BaseCallback { event.setScratch(true); event.setOldStatus(oldStatus); event.setReassign(reassign); - + if (conf != null) + event.setScaleTopology(true); TopologyAssign.push(event); + event.waitFinish(); } catch (Exception e) { LOG.error("do-rebalance error!", e); // Rollback the changes on ZK if (isSetTaskInfo) { - try { - StormClusterState clusterState = - data.getStormClusterState(); - clusterState.remove_task(topologyid, newTasks); - } catch (Exception e1) { - LOG.error( - "Failed to rollback the changes on ZK for task-" - + newTasks, e); - } + try { + StormClusterState clusterState = data.getStormClusterState(); + clusterState.remove_task(topologyid, newTasks); + } catch (Exception e1) { + LOG.error("Failed to rollback the changes on ZK for task-" + newTasks, e); } } + } DelayStatusTransitionCallback delayCallback = - new DelayStatusTransitionCallback(data, topologyid, oldStatus, - StatusType.rebalancing, StatusType.done_rebalance); + new DelayStatusTransitionCallback(data, topologyid, oldStatus, StatusType.rebalancing, StatusType.done_rebalance); return delayCallback.execute(); } - private void setTaskInfo(StormTopology oldTopology, - StormTopology newTopology) throws Exception { + private void setTaskInfo(StormTopology oldTopology, StormTopology newTopology) throws Exception { StormClusterState clusterState = data.getStormClusterState(); // Retrieve the max task ID - TreeSet<Integer> taskIds = - new TreeSet<Integer>(clusterState.task_ids(topologyid)); + TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids(topologyid)); int cnt = taskIds.descendingIterator().next(); cnt = setBoltInfo(oldTopology, newTopology, cnt, clusterState); cnt = setSpoutInfo(oldTopology, newTopology, cnt, clusterState); } - private int setBoltInfo(StormTopology oldTopology, - StormTopology newTopology, int cnt, StormClusterState clusterState) - throws Exception { + private int setBoltInfo(StormTopology oldTopology, StormTopology newTopology, int cnt, StormClusterState clusterState) throws Exception { Map<String, Bolt> oldBolts = oldTopology.get_bolts(); Map<String, Bolt> bolts = newTopology.get_bolts(); for (Entry<String, Bolt> entry : oldBolts.entrySet()) { String boltName = entry.getKey(); Bolt oldBolt = entry.getValue(); Bolt bolt = bolts.get(boltName); - if (oldBolt.get_common().get_parallelism_hint() > bolt.get_common() - .get_parallelism_hint()) { - int removedTaskNum = - oldBolt.get_common().get_parallelism_hint() - - bolt.get_common().get_parallelism_hint(); - TreeSet<Integer> taskIds = - new TreeSet<Integer>( - clusterState.task_ids_by_componentId( - topologyid, boltName)); - Iterator<Integer> descendIterator = - taskIds.descendingIterator(); + if (oldBolt.get_common().get_parallelism_hint() > bolt.get_common().get_parallelism_hint()) { + int removedTaskNum = oldBolt.get_common().get_parallelism_hint() - bolt.get_common().get_parallelism_hint(); + TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids_by_componentId(topologyid, boltName)); + Iterator<Integer> descendIterator = taskIds.descendingIterator(); while (--removedTaskNum >= 0) { int taskId = descendIterator.next(); removeTask(topologyid, taskId, clusterState); - LOG.info("Remove bolt task, taskId=" + taskId + " for " - + boltName); + LOG.info("Remove bolt task, taskId=" + taskId + " for " + boltName); } - } else if (oldBolt.get_common().get_parallelism_hint() == bolt - .get_common().get_parallelism_hint()) { + } else if (oldBolt.get_common().get_parallelism_hint() == bolt.get_common().get_parallelism_hint()) { continue; } else { - int delta = - bolt.get_common().get_parallelism_hint() - - oldBolt.get_common().get_parallelism_hint(); + int delta = bolt.get_common().get_parallelism_hint() - oldBolt.get_common().get_parallelism_hint(); Map<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, TaskInfo>(); for (int i = 1; i <= delta; i++) { cnt++; - TaskInfo taskInfo = - new TaskInfo((String) entry.getKey(), "bolt"); + TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), "bolt"); taskInfoMap.put(cnt, taskInfo); newTasks.add(cnt); - LOG.info("Setup new bolt task, taskId=" + cnt + " for " - + boltName); + LOG.info("Setup new bolt task, taskId=" + cnt + " for " + boltName); } clusterState.add_task(topologyid, taskInfoMap); } @@ -240,52 +193,35 @@ public class DoRebalanceTransitionCallback extends BaseCallback { return cnt; } - private int setSpoutInfo(StormTopology oldTopology, - StormTopology newTopology, int cnt, StormClusterState clusterState) - throws Exception { + private int setSpoutInfo(StormTopology oldTopology, StormTopology newTopology, int cnt, StormClusterState clusterState) throws Exception { Map<String, SpoutSpec> oldSpouts = oldTopology.get_spouts(); Map<String, SpoutSpec> spouts = newTopology.get_spouts(); for (Entry<String, SpoutSpec> entry : oldSpouts.entrySet()) { String spoutName = entry.getKey(); SpoutSpec oldSpout = entry.getValue(); SpoutSpec spout = spouts.get(spoutName); - if (oldSpout.get_common().get_parallelism_hint() > spout - .get_common().get_parallelism_hint()) { - int removedTaskNum = - oldSpout.get_common().get_parallelism_hint() - - spout.get_common().get_parallelism_hint(); - TreeSet<Integer> taskIds = - new TreeSet<Integer>( - clusterState.task_ids_by_componentId( - topologyid, spoutName)); - Iterator<Integer> descendIterator = - taskIds.descendingIterator(); + if (oldSpout.get_common().get_parallelism_hint() > spout.get_common().get_parallelism_hint()) { + int removedTaskNum = oldSpout.get_common().get_parallelism_hint() - spout.get_common().get_parallelism_hint(); + TreeSet<Integer> taskIds = new TreeSet<Integer>(clusterState.task_ids_by_componentId(topologyid, spoutName)); + Iterator<Integer> descendIterator = taskIds.descendingIterator(); while (--removedTaskNum >= 0) { int taskId = descendIterator.next(); removeTask(topologyid, taskId, clusterState); - LOG.info("Remove spout task, taskId=" + taskId + " for " - + spoutName); + LOG.info("Remove spout task, taskId=" + taskId + " for " + spoutName); } - - - } else if (oldSpout.get_common().get_parallelism_hint() == spout - .get_common().get_parallelism_hint()) { + } else if (oldSpout.get_common().get_parallelism_hint() == spout.get_common().get_parallelism_hint()) { continue; } else { - int delta = - spout.get_common().get_parallelism_hint() - - oldSpout.get_common().get_parallelism_hint(); + int delta = spout.get_common().get_parallelism_hint() - oldSpout.get_common().get_parallelism_hint(); Map<Integer, TaskInfo> taskInfoMap = new HashMap<Integer, TaskInfo>(); for (int i = 1; i <= delta; i++) { cnt++; - TaskInfo taskInfo = - new TaskInfo((String) entry.getKey(), "spout"); + TaskInfo taskInfo = new TaskInfo((String) entry.getKey(), "spout"); taskInfoMap.put(cnt, taskInfo); newTasks.add(cnt); - LOG.info("Setup new spout task, taskId=" + cnt + " for " - + spoutName); + LOG.info("Setup new spout task, taskId=" + cnt + " for " + spoutName); } clusterState.add_task(topologyid, taskInfoMap); } @@ -294,12 +230,10 @@ public class DoRebalanceTransitionCallback extends BaseCallback { return cnt; } - private void removeTask(String topologyId, int taskId, - StormClusterState clusterState) throws Exception { + private void removeTask(String topologyId, int taskId, StormClusterState clusterState) throws Exception { Set<Integer> taskIds = new HashSet<Integer>(taskId); clusterState.remove_task(topologyid, taskIds); - Map<Integer, TkHbCacheTime> TkHbs = - data.getTaskHeartbeatsCache(topologyid, false); + Map<Integer, TkHbCacheTime> TkHbs = data.getTaskHeartbeatsCache(topologyid, false); if (TkHbs != null) { TkHbs.remove(taskId); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java index 4dad890..e169fab 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/KillTransitionCallback.java @@ -19,15 +19,14 @@ package com.alibaba.jstorm.callback.impl; import com.alibaba.jstorm.daemon.nimbus.NimbusData; import com.alibaba.jstorm.daemon.nimbus.StatusType; +import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable; /** * The action when nimbus receive killed command. - * - * 1. change current topology status as killed 2. one TIMEOUT seconds later, do - * remove action, which remove topology from ZK + * <p/> + * 1. change current topology status as killed 2. one TIMEOUT seconds later, do remove action, which remove topology from ZK * * @author Longda - * */ public class KillTransitionCallback extends DelayStatusTransitionCallback { @@ -35,4 +34,15 @@ public class KillTransitionCallback extends DelayStatusTransitionCallback { super(data, topologyid, null, StatusType.killed, StatusType.remove); } + @Override + public <T> Object execute(T... args) { + TopologyMetricsRunnable.KillTopologyEvent event = new TopologyMetricsRunnable.KillTopologyEvent(); + event.clusterName = this.data.getClusterName(); + event.topologyId = this.topologyid; + event.timestamp = System.currentTimeMillis(); + + this.data.getMetricRunnable().pushEvent(event); + return super.execute(args); + } + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java index 1b4841c..ffa2ec8 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/ReassignTransitionCallback.java @@ -24,9 +24,8 @@ import com.alibaba.jstorm.daemon.nimbus.TopologyAssign; import com.alibaba.jstorm.daemon.nimbus.TopologyAssignEvent; /** - * 1. every Config.NIMBUS_MONITOR_FREQ_SECS will call MonitorRunnable 2. - * MonitorRunnable will call NimbusData.transition 3. NimbusData.transition will - * this callback + * 1. every Config.NIMBUS_MONITOR_FREQ_SECS will call MonitorRunnable 2. MonitorRunnable will call NimbusData.transition 3. NimbusData.transition will this + * callback * * */ @@ -42,8 +41,7 @@ public class ReassignTransitionCallback extends BaseCallback { this.oldStatus = null; } - public ReassignTransitionCallback(NimbusData data, String topologyid, - StormStatus oldStatus) { + public ReassignTransitionCallback(NimbusData data, String topologyid, StormStatus oldStatus) { this.data = data; this.topologyid = topologyid; this.oldStatus = oldStatus; @@ -59,6 +57,7 @@ public class ReassignTransitionCallback extends BaseCallback { assignEvent.setOldStatus(oldStatus); TopologyAssign.push(assignEvent); + assignEvent.waitFinish(); return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java index f65f542..476f4f5 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RebalanceTransitionCallback.java @@ -22,21 +22,17 @@ import com.alibaba.jstorm.daemon.nimbus.NimbusData; import com.alibaba.jstorm.daemon.nimbus.StatusType; /** - * The action when nimbus receive rebalance command. Rebalance command is only - * valid when current status is active + * The action when nimbus receive rebalance command. Rebalance command is only valid when current status is active * - * 1. change current topology status as rebalancing 2. do_rebalance action after - * 2 * TIMEOUT seconds + * 1. change current topology status as rebalancing 2. do_rebalance action after 2 * TIMEOUT seconds * * @author Lixin/Longda * */ public class RebalanceTransitionCallback extends DelayStatusTransitionCallback { - public RebalanceTransitionCallback(NimbusData data, String topologyid, - StormStatus status) { - super(data, topologyid, status, StatusType.rebalancing, - StatusType.do_rebalance); + public RebalanceTransitionCallback(NimbusData data, String topologyid, StormStatus status) { + super(data, topologyid, status, StatusType.rebalancing, StatusType.do_rebalance); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java index 231d8e9..8052c40 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/RemoveTransitionCallback.java @@ -35,8 +35,7 @@ import com.alibaba.jstorm.daemon.nimbus.NimbusUtils; */ public class RemoveTransitionCallback extends BaseCallback { - private static Logger LOG = LoggerFactory - .getLogger(RemoveTransitionCallback.class); + private static Logger LOG = LoggerFactory.getLogger(RemoveTransitionCallback.class); private NimbusData data; private String topologyid; @@ -51,13 +50,13 @@ public class RemoveTransitionCallback extends BaseCallback { LOG.info("Begin to remove topology: " + topologyid); try { - StormBase stormBase = - data.getStormClusterState().storm_base(topologyid, null); + StormBase stormBase = data.getStormClusterState().storm_base(topologyid, null); if (stormBase == null) { LOG.info("Topology " + topologyid + " has been removed "); return null; } data.getStormClusterState().remove_storm(topologyid); + data.getTasksHeartbeat().remove(topologyid); NimbusUtils.removeTopologyTaskTimeout(data, topologyid); LOG.info("Successfully removed ZK items topology: " + topologyid); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java deleted file mode 100644 index ca4e0ee..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateConfTransitionCallback.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.callback.impl; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.callback.BaseCallback; -import com.alibaba.jstorm.cluster.StormClusterState; -import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.cluster.StormStatus; -import com.alibaba.jstorm.daemon.nimbus.NimbusData; -import com.alibaba.jstorm.schedule.Assignment; -import com.alibaba.jstorm.schedule.Assignment.AssignmentType; - -/** - * Update user configuration - * - * @author Basti.lj - */ -public class UpdateConfTransitionCallback extends BaseCallback { - - private static Logger LOG = LoggerFactory - .getLogger(DelayStatusTransitionCallback.class); - - public static final int DEFAULT_DELAY_SECONDS = 30; - - private NimbusData data; - private String topologyId; - private StormStatus currentStatus; - - public UpdateConfTransitionCallback(NimbusData data, String topologyId, - StormStatus currentStatus) { - this.data = data; - this.topologyId = topologyId; - this.currentStatus = currentStatus; - } - - @Override - public <T> Object execute(T... args) { - StormClusterState clusterState = data.getStormClusterState(); - try { - Map userConf = (Map) args[0]; - Map topoConf = - StormConfig.read_nimbus_topology_conf(data.getConf(), - topologyId); - topoConf.putAll(userConf); - StormConfig.write_nimbus_topology_conf(data.getConf(), topologyId, topoConf); - - Assignment assignment = - clusterState.assignment_info(topologyId, null); - assignment.setAssignmentType(AssignmentType.Config); - assignment.updateTimeStamp(); - clusterState.set_assignment(topologyId, assignment); - LOG.info("Successfully update new config to ZK for " + topologyId); - } catch (Exception e) { - LOG.error("Failed to update user configuartion.", e); - } - return currentStatus; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java new file mode 100644 index 0000000..706b98b --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/callback/impl/UpdateTopologyTransitionCallback.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.callback.impl; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.jstorm.callback.BaseCallback; +import com.alibaba.jstorm.cluster.StormClusterState; +import com.alibaba.jstorm.cluster.StormConfig; +import com.alibaba.jstorm.cluster.StormStatus; +import com.alibaba.jstorm.daemon.nimbus.NimbusData; +import com.alibaba.jstorm.schedule.Assignment; +import com.alibaba.jstorm.schedule.Assignment.AssignmentType; + +/** + * Update user configuration + * + * @author Basti.lj + */ +public class UpdateTopologyTransitionCallback extends BaseCallback { + + private static Logger LOG = LoggerFactory.getLogger(DelayStatusTransitionCallback.class); + + public static final int DEFAULT_DELAY_SECONDS = 30; + + private NimbusData data; + private String topologyId; + private StormStatus currentStatus; + + public UpdateTopologyTransitionCallback(NimbusData data, String topologyId, StormStatus currentStatus) { + this.data = data; + this.topologyId = topologyId; + this.currentStatus = currentStatus; + } + + @Override + public <T> Object execute(T... args) { + StormClusterState clusterState = data.getStormClusterState(); + try { + Map userConf = (Map) args[0]; + Map topoConf = StormConfig.read_nimbus_topology_conf(data.getConf(), topologyId); + topoConf.putAll(userConf); + StormConfig.write_nimbus_topology_conf(data.getConf(), topologyId, topoConf); + + Assignment assignment = clusterState.assignment_info(topologyId, null); + assignment.setAssignmentType(AssignmentType.UpdateTopology); + assignment.updateTimeStamp(); + clusterState.set_assignment(topologyId, assignment); + LOG.info("Successfully update topology information to ZK for " + topologyId); + } catch (Exception e) { + LOG.error("Failed to update topology.", e); + } + return currentStatus; + } + +}
