Fix mixed tab / space, remove FIXME
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1a0d4bdd Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1a0d4bdd Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1a0d4bdd Branch: refs/heads/0.9.3-branch Commit: 1a0d4bdd735ba0ade42f6777a4c47affec931557 Parents: ca5874c Author: Jungtaek Lim <[email protected]> Authored: Wed Oct 8 23:06:24 2014 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Wed Oct 8 23:06:24 2014 +0900 ---------------------------------------------------------------------- .../jvm/backtype/storm/spout/ShellSpout.java | 86 +++---- .../src/jvm/backtype/storm/task/ShellBolt.java | 247 +++++++++---------- 2 files changed, 165 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1a0d4bdd/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java index 494ac6c..fc2ddf2 100644 --- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java +++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java @@ -47,11 +47,11 @@ public class ShellSpout implements ISpout { private SpoutMsg _spoutMsg; - private int workerTimeoutMills; - private Timer heartBeatTimer; - private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); + private int workerTimeoutMills; + private Timer heartBeatTimer; + private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); - public ShellSpout(ShellComponent component) { + public ShellSpout(ShellComponent component) { this(component.get_execution_command(), component.get_script()); } @@ -64,18 +64,18 @@ public class ShellSpout implements ISpout { _collector = collector; _context = context; - workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); _process = new ShellProcess(_command); Number subpid = _process.launch(stormConf, context); LOG.info("Launched subprocess with pid " + subpid); - heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); + heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); } public void close() { - heartBeatTimer.cancel(); + heartBeatTimer.cancel(); _process.destroy(); } @@ -145,9 +145,9 @@ public class ShellSpout implements ISpout { throw new IllegalArgumentException("Command not found in spout message: " + shellMsg); } - setHeartbeat(); + setHeartbeat(); - if (command.equals("sync")) { + if (command.equals("sync")) { return; } else if (command.equals("log")) { handleLog(shellMsg); @@ -176,7 +176,7 @@ public class ShellSpout implements ISpout { } } - private void handleLog(ShellMsg shellMsg) { + private void handleLog(ShellMsg shellMsg) { String msg = shellMsg.getMsg(); msg = "ShellLog " + _process.getProcessInfoString() + " " + msg; ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel(); @@ -205,52 +205,52 @@ public class ShellSpout implements ISpout { @Override public void activate() { - LOG.info("Start checking heartbeat..."); - // prevent timer to check heartbeat based on last thing before activate - setHeartbeat(); - heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000); + LOG.info("Start checking heartbeat..."); + // prevent timer to check heartbeat based on last thing before activate + setHeartbeat(); + heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1000, 1 * 1000); } @Override public void deactivate() { - heartBeatTimer.cancel(); + heartBeatTimer.cancel(); } - private void setHeartbeat() { - lastHeartbeatTimestamp.set(System.currentTimeMillis()); - } + private void setHeartbeat() { + lastHeartbeatTimestamp.set(System.currentTimeMillis()); + } - private long getLastHeartbeat() { - return lastHeartbeatTimestamp.get(); - } + private long getLastHeartbeat() { + return lastHeartbeatTimestamp.get(); + } - private void die(Throwable exception) { - heartBeatTimer.cancel(); + private void die(Throwable exception) { + heartBeatTimer.cancel(); - LOG.error("Halting process: ShellSpout died.", exception); - _collector.reportError(exception); - System.exit(11); - } + LOG.error("Halting process: ShellSpout died.", exception); + _collector.reportError(exception); + System.exit(11); + } - private class SpoutHeartbeatTimerTask extends TimerTask { - private ShellSpout spout; + private class SpoutHeartbeatTimerTask extends TimerTask { + private ShellSpout spout; - public SpoutHeartbeatTimerTask(ShellSpout spout) { - this.spout = spout; - } + public SpoutHeartbeatTimerTask(ShellSpout spout) { + this.spout = spout; + } - @Override - public void run() { - long currentTimeMillis = System.currentTimeMillis(); - long lastHeartbeat = getLastHeartbeat(); + @Override + public void run() { + long currentTimeMillis = System.currentTimeMillis(); + long lastHeartbeat = getLastHeartbeat(); - LOG.debug("current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat - + ", worker timeout (ms) : " + workerTimeoutMills); + LOG.debug("current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat + + ", worker timeout (ms) : " + workerTimeoutMills); - if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { - spout.die(new RuntimeException("subprocess heartbeat timeout")); - } - } - } + if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { + spout.die(new RuntimeException("subprocess heartbeat timeout")); + } + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a0d4bdd/storm-core/src/jvm/backtype/storm/task/ShellBolt.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java index cbb8e2b..cb70649 100644 --- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java +++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java @@ -69,8 +69,8 @@ import org.slf4j.LoggerFactory; * </pre> */ public class ShellBolt implements IBolt { - public static final String HEARTBEAT_STREAM_ID = "__heartbeat"; - public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class); + public static final String HEARTBEAT_STREAM_ID = "__heartbeat"; + public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class); Process _subprocess; OutputCollector _collector; Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>(); @@ -87,11 +87,11 @@ public class ShellBolt implements IBolt { private TopologyContext _context; - private int workerTimeoutMills; - private Timer heartBeatTimer; - private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); + private int workerTimeoutMills; + private Timer heartBeatTimer; + private AtomicLong lastHeartbeatTimestamp = new AtomicLong(); - public ShellBolt(ShellComponent component) { + public ShellBolt(ShellComponent component) { this(component.get_execution_command(), component.get_script()); } @@ -110,7 +110,7 @@ public class ShellBolt implements IBolt { _context = context; - workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); _process = new ShellProcess(_command); @@ -125,12 +125,12 @@ public class ShellBolt implements IBolt { _writerThread = new Thread(new BoltWriterRunnable()); _writerThread.start(); - heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); - heartBeatTimer.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1000, 1 * 1000); + heartBeatTimer = new Timer(context.getThisTaskId() + "-heartbeatTimer", true); + heartBeatTimer.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1000, 1 * 1000); - LOG.info("Start checking heartbeat..."); - setHeartbeat(); - } + LOG.info("Start checking heartbeat..."); + setHeartbeat(); + } public void execute(Tuple input) { if (_exception != null) { @@ -141,7 +141,7 @@ public class ShellBolt implements IBolt { String genId = Long.toString(_rand.nextLong()); _inputs.put(genId, input); try { - BoltMsg boltMsg = createBoltMessage(input, genId); + BoltMsg boltMsg = createBoltMessage(input, genId); _pendingWrites.put(boltMsg); } catch(InterruptedException e) { @@ -150,19 +150,19 @@ public class ShellBolt implements IBolt { } } - private BoltMsg createBoltMessage(Tuple input, String genId) { - BoltMsg boltMsg = new BoltMsg(); - boltMsg.setId(genId); - boltMsg.setComp(input.getSourceComponent()); - boltMsg.setStream(input.getSourceStreamId()); - boltMsg.setTask(input.getSourceTask()); - boltMsg.setTuple(input.getValues()); - return boltMsg; - } - - public void cleanup() { + private BoltMsg createBoltMessage(Tuple input, String genId) { + BoltMsg boltMsg = new BoltMsg(); + boltMsg.setId(genId); + boltMsg.setComp(input.getSourceComponent()); + boltMsg.setStream(input.getSourceStreamId()); + boltMsg.setTask(input.getSourceTask()); + boltMsg.setTuple(input.getValues()); + return boltMsg; + } + + public void cleanup() { _running = false; - heartBeatTimer.cancel(); + heartBeatTimer.cancel(); _writerThread.interrupt(); _readerThread.interrupt(); _process.destroy(); @@ -170,9 +170,6 @@ public class ShellBolt implements IBolt { } private void handleAck(Object id) { - // FIXME : debug, should be removed - LOG.debug("Ack... id : " + id); - Tuple acked = _inputs.remove(id); if(acked==null) { throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id); @@ -272,15 +269,15 @@ public class ShellBolt implements IBolt { } } - private void setHeartbeat() { - lastHeartbeatTimestamp.set(System.currentTimeMillis()); - } + private void setHeartbeat() { + lastHeartbeatTimestamp.set(System.currentTimeMillis()); + } - private long getLastHeartbeat() { - return lastHeartbeatTimestamp.get(); - } + private long getLastHeartbeat() { + return lastHeartbeatTimestamp.get(); + } - private void die(Throwable exception) { + private void die(Throwable exception) { String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); _exception = new RuntimeException(processInfo, exception); LOG.error("Halting process: ShellBolt died.", exception); @@ -290,94 +287,94 @@ public class ShellBolt implements IBolt { } } - private class BoltHeartbeatTimerTask extends TimerTask { - private ShellBolt bolt; - - public BoltHeartbeatTimerTask(ShellBolt bolt) { - this.bolt = bolt; - } - - @Override - public void run() { - long currentTimeMillis = System.currentTimeMillis(); - long lastHeartbeat = getLastHeartbeat(); - - LOG.debug("BOLT - current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat - + ", worker timeout (ms) : " + workerTimeoutMills); - - if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { - bolt.die(new RuntimeException("subprocess heartbeat timeout")); - } - - String genId = Long.toString(_rand.nextLong()); - try { - _pendingWrites.put(createHeartbeatBoltMessage(genId)); - } catch(InterruptedException e) { - String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); - bolt.die(new RuntimeException("Error during multilang processing " + processInfo, e)); - } - } - - private BoltMsg createHeartbeatBoltMessage(String genId) { - BoltMsg msg = new BoltMsg(); - msg.setId(genId); - msg.setTask(Constants.SYSTEM_TASK_ID); - msg.setStream(HEARTBEAT_STREAM_ID); - msg.setTuple(new ArrayList<Object>()); - return msg; - } - } - - private class BoltReaderRunnable implements Runnable { - public void run() { - while (_running) { - try { - ShellMsg shellMsg = _process.readShellMsg(); - - String command = shellMsg.getCommand(); - if (command == null) { - throw new IllegalArgumentException("Command not found in bolt message: " + shellMsg); - } - if (command.equals("sync")) { - setHeartbeat(); - } else if(command.equals("ack")) { - handleAck(shellMsg.getId()); - } else if (command.equals("fail")) { - handleFail(shellMsg.getId()); - } else if (command.equals("error")) { - handleError(shellMsg.getMsg()); - } else if (command.equals("log")) { - handleLog(shellMsg); - } else if (command.equals("emit")) { - handleEmit(shellMsg); - } else if (command.equals("metrics")) { - handleMetrics(shellMsg); - } - } catch (InterruptedException e) { - } catch (Throwable t) { - die(t); - } - } - } - } - - private class BoltWriterRunnable implements Runnable { - public void run() { - while (_running) { - try { - Object write = _pendingWrites.poll(1, SECONDS); - if (write instanceof BoltMsg) { - _process.writeBoltMsg((BoltMsg) write); - } else if (write instanceof List<?>) { - _process.writeTaskIds((List<Integer>)write); - } else if (write != null) { - throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); - } - } catch (InterruptedException e) { - } catch (Throwable t) { - die(t); - } - } - } - } + private class BoltHeartbeatTimerTask extends TimerTask { + private ShellBolt bolt; + + public BoltHeartbeatTimerTask(ShellBolt bolt) { + this.bolt = bolt; + } + + @Override + public void run() { + long currentTimeMillis = System.currentTimeMillis(); + long lastHeartbeat = getLastHeartbeat(); + + LOG.debug("BOLT - current time : " + currentTimeMillis + ", last heartbeat : " + lastHeartbeat + + ", worker timeout (ms) : " + workerTimeoutMills); + + if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) { + bolt.die(new RuntimeException("subprocess heartbeat timeout")); + } + + String genId = Long.toString(_rand.nextLong()); + try { + _pendingWrites.put(createHeartbeatBoltMessage(genId)); + } catch(InterruptedException e) { + String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString(); + bolt.die(new RuntimeException("Error during multilang processing " + processInfo, e)); + } + } + + private BoltMsg createHeartbeatBoltMessage(String genId) { + BoltMsg msg = new BoltMsg(); + msg.setId(genId); + msg.setTask(Constants.SYSTEM_TASK_ID); + msg.setStream(HEARTBEAT_STREAM_ID); + msg.setTuple(new ArrayList<Object>()); + return msg; + } + } + + private class BoltReaderRunnable implements Runnable { + public void run() { + while (_running) { + try { + ShellMsg shellMsg = _process.readShellMsg(); + + String command = shellMsg.getCommand(); + if (command == null) { + throw new IllegalArgumentException("Command not found in bolt message: " + shellMsg); + } + if (command.equals("sync")) { + setHeartbeat(); + } else if(command.equals("ack")) { + handleAck(shellMsg.getId()); + } else if (command.equals("fail")) { + handleFail(shellMsg.getId()); + } else if (command.equals("error")) { + handleError(shellMsg.getMsg()); + } else if (command.equals("log")) { + handleLog(shellMsg); + } else if (command.equals("emit")) { + handleEmit(shellMsg); + } else if (command.equals("metrics")) { + handleMetrics(shellMsg); + } + } catch (InterruptedException e) { + } catch (Throwable t) { + die(t); + } + } + } + } + + private class BoltWriterRunnable implements Runnable { + public void run() { + while (_running) { + try { + Object write = _pendingWrites.poll(1, SECONDS); + if (write instanceof BoltMsg) { + _process.writeBoltMsg((BoltMsg) write); + } else if (write instanceof List<?>) { + _process.writeTaskIds((List<Integer>)write); + } else if (write != null) { + throw new RuntimeException("Unknown class type to write: " + write.getClass().getName()); + } + } catch (InterruptedException e) { + } catch (Throwable t) { + die(t); + } + } + } + } }
