Fix mixed tab / space, remove FIXME

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1a0d4bdd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1a0d4bdd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1a0d4bdd

Branch: refs/heads/0.9.3-branch
Commit: 1a0d4bdd735ba0ade42f6777a4c47affec931557
Parents: ca5874c
Author: Jungtaek Lim <[email protected]>
Authored: Wed Oct 8 23:06:24 2014 +0900
Committer: Jungtaek Lim <[email protected]>
Committed: Wed Oct 8 23:06:24 2014 +0900

----------------------------------------------------------------------
 .../jvm/backtype/storm/spout/ShellSpout.java    |  86 +++----
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 247 +++++++++----------
 2 files changed, 165 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1a0d4bdd/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java 
b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index 494ac6c..fc2ddf2 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -47,11 +47,11 @@ public class ShellSpout implements ISpout {
     
     private SpoutMsg _spoutMsg;
 
-       private int workerTimeoutMills;
-       private Timer heartBeatTimer;
-       private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+    private int workerTimeoutMills;
+    private Timer heartBeatTimer;
+    private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
 
-       public ShellSpout(ShellComponent component) {
+    public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
 
@@ -64,18 +64,18 @@ public class ShellSpout implements ISpout {
         _collector = collector;
         _context = context;
 
-               workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+        workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
 
         _process = new ShellProcess(_command);
 
         Number subpid = _process.launch(stormConf, context);
         LOG.info("Launched subprocess with pid " + subpid);
 
-               heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
+        heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
     }
 
     public void close() {
-               heartBeatTimer.cancel();
+        heartBeatTimer.cancel();
         _process.destroy();
     }
 
@@ -145,9 +145,9 @@ public class ShellSpout implements ISpout {
                     throw new IllegalArgumentException("Command not found in 
spout message: " + shellMsg);
                 }
 
-                               setHeartbeat();
+                setHeartbeat();
 
-                               if (command.equals("sync")) {
+                if (command.equals("sync")) {
                     return;
                 } else if (command.equals("log")) {
                     handleLog(shellMsg);
@@ -176,7 +176,7 @@ public class ShellSpout implements ISpout {
         }
     }
 
-       private void handleLog(ShellMsg shellMsg) {
+    private void handleLog(ShellMsg shellMsg) {
         String msg = shellMsg.getMsg();
         msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
         ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
@@ -205,52 +205,52 @@ public class ShellSpout implements ISpout {
 
     @Override
     public void activate() {
-               LOG.info("Start checking heartbeat...");
-               // prevent timer to check heartbeat based on last thing before 
activate
-               setHeartbeat();
-               heartBeatTimer.scheduleAtFixedRate(new 
SpoutHeartbeatTimerTask(this), 1000, 1 * 1000);
+        LOG.info("Start checking heartbeat...");
+        // prevent timer to check heartbeat based on last thing before activate
+        setHeartbeat();
+        heartBeatTimer.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 
1000, 1 * 1000);
     }
 
     @Override
     public void deactivate() {
-               heartBeatTimer.cancel();
+        heartBeatTimer.cancel();
     }
 
-       private void setHeartbeat() {
-               lastHeartbeatTimestamp.set(System.currentTimeMillis());
-       }
+    private void setHeartbeat() {
+        lastHeartbeatTimestamp.set(System.currentTimeMillis());
+    }
 
-       private long getLastHeartbeat() {
-               return lastHeartbeatTimestamp.get();
-       }
+    private long getLastHeartbeat() {
+        return lastHeartbeatTimestamp.get();
+    }
 
-       private void die(Throwable exception) {
-               heartBeatTimer.cancel();
+    private void die(Throwable exception) {
+        heartBeatTimer.cancel();
 
-               LOG.error("Halting process: ShellSpout died.", exception);
-               _collector.reportError(exception);
-               System.exit(11);
-       }
+        LOG.error("Halting process: ShellSpout died.", exception);
+        _collector.reportError(exception);
+        System.exit(11);
+    }
 
-       private class SpoutHeartbeatTimerTask extends TimerTask {
-               private ShellSpout spout;
+    private class SpoutHeartbeatTimerTask extends TimerTask {
+        private ShellSpout spout;
 
-               public SpoutHeartbeatTimerTask(ShellSpout spout) {
-                       this.spout = spout;
-               }
+        public SpoutHeartbeatTimerTask(ShellSpout spout) {
+            this.spout = spout;
+        }
 
-               @Override
-               public void run() {
-                       long currentTimeMillis = System.currentTimeMillis();
-                       long lastHeartbeat = getLastHeartbeat();
+        @Override
+        public void run() {
+            long currentTimeMillis = System.currentTimeMillis();
+            long lastHeartbeat = getLastHeartbeat();
 
-                       LOG.debug("current time : " + currentTimeMillis + ", 
last heartbeat : " + lastHeartbeat
-                                       + ", worker timeout (ms) : " + 
workerTimeoutMills);
+            LOG.debug("current time : " + currentTimeMillis + ", last 
heartbeat : " + lastHeartbeat
+                    + ", worker timeout (ms) : " + workerTimeoutMills);
 
-                       if (currentTimeMillis - lastHeartbeat > 
workerTimeoutMills) {
-                               spout.die(new RuntimeException("subprocess 
heartbeat timeout"));
-                       }
-               }
-       }
+            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+                spout.die(new RuntimeException("subprocess heartbeat 
timeout"));
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1a0d4bdd/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java 
b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index cbb8e2b..cb70649 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -69,8 +69,8 @@ import org.slf4j.LoggerFactory;
  * </pre>
  */
 public class ShellBolt implements IBolt {
-       public static final String HEARTBEAT_STREAM_ID = "__heartbeat";
-       public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
+    public static final String HEARTBEAT_STREAM_ID = "__heartbeat";
+    public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
     Process _subprocess;
     OutputCollector _collector;
     Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
@@ -87,11 +87,11 @@ public class ShellBolt implements IBolt {
     
     private TopologyContext _context;
 
-       private int workerTimeoutMills;
-       private Timer heartBeatTimer;
-       private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
+    private int workerTimeoutMills;
+    private Timer heartBeatTimer;
+    private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
 
-       public ShellBolt(ShellComponent component) {
+    public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
     }
 
@@ -110,7 +110,7 @@ public class ShellBolt implements IBolt {
 
         _context = context;
 
-               workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
+        workerTimeoutMills = 1000 * 
RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
 
         _process = new ShellProcess(_command);
 
@@ -125,12 +125,12 @@ public class ShellBolt implements IBolt {
         _writerThread = new Thread(new BoltWriterRunnable());
         _writerThread.start();
 
-               heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
-               heartBeatTimer.scheduleAtFixedRate(new 
BoltHeartbeatTimerTask(this), 1000, 1 * 1000);
+        heartBeatTimer = new Timer(context.getThisTaskId() + 
"-heartbeatTimer", true);
+        heartBeatTimer.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 
1000, 1 * 1000);
 
-               LOG.info("Start checking heartbeat...");
-               setHeartbeat();
-       }
+        LOG.info("Start checking heartbeat...");
+        setHeartbeat();
+    }
 
     public void execute(Tuple input) {
         if (_exception != null) {
@@ -141,7 +141,7 @@ public class ShellBolt implements IBolt {
         String genId = Long.toString(_rand.nextLong());
         _inputs.put(genId, input);
         try {
-                       BoltMsg boltMsg = createBoltMessage(input, genId);
+            BoltMsg boltMsg = createBoltMessage(input, genId);
 
             _pendingWrites.put(boltMsg);
         } catch(InterruptedException e) {
@@ -150,19 +150,19 @@ public class ShellBolt implements IBolt {
         }
     }
 
-       private BoltMsg createBoltMessage(Tuple input, String genId) {
-               BoltMsg boltMsg = new BoltMsg();
-               boltMsg.setId(genId);
-               boltMsg.setComp(input.getSourceComponent());
-               boltMsg.setStream(input.getSourceStreamId());
-               boltMsg.setTask(input.getSourceTask());
-               boltMsg.setTuple(input.getValues());
-               return boltMsg;
-       }
-
-       public void cleanup() {
+    private BoltMsg createBoltMessage(Tuple input, String genId) {
+        BoltMsg boltMsg = new BoltMsg();
+        boltMsg.setId(genId);
+        boltMsg.setComp(input.getSourceComponent());
+        boltMsg.setStream(input.getSourceStreamId());
+        boltMsg.setTask(input.getSourceTask());
+        boltMsg.setTuple(input.getValues());
+        return boltMsg;
+    }
+
+    public void cleanup() {
         _running = false;
-               heartBeatTimer.cancel();
+        heartBeatTimer.cancel();
         _writerThread.interrupt();
         _readerThread.interrupt();
         _process.destroy();
@@ -170,9 +170,6 @@ public class ShellBolt implements IBolt {
     }
 
     private void handleAck(Object id) {
-               // FIXME : debug, should be removed
-               LOG.debug("Ack... id : " + id);
-
         Tuple acked = _inputs.remove(id);
         if(acked==null) {
             throw new RuntimeException("Acked a non-existent or already 
acked/failed id: " + id);
@@ -272,15 +269,15 @@ public class ShellBolt implements IBolt {
         }       
     }
 
-       private void setHeartbeat() {
-               lastHeartbeatTimestamp.set(System.currentTimeMillis());
-       }
+    private void setHeartbeat() {
+        lastHeartbeatTimestamp.set(System.currentTimeMillis());
+    }
 
-       private long getLastHeartbeat() {
-               return lastHeartbeatTimestamp.get();
-       }
+    private long getLastHeartbeat() {
+        return lastHeartbeatTimestamp.get();
+    }
 
-       private void die(Throwable exception) {
+    private void die(Throwable exception) {
         String processInfo = _process.getProcessInfoString() + 
_process.getProcessTerminationInfoString();
         _exception = new RuntimeException(processInfo, exception);
         LOG.error("Halting process: ShellBolt died.", exception);
@@ -290,94 +287,94 @@ public class ShellBolt implements IBolt {
         }
     }
 
-       private class BoltHeartbeatTimerTask extends TimerTask {
-               private ShellBolt bolt;
-
-               public BoltHeartbeatTimerTask(ShellBolt bolt) {
-                       this.bolt = bolt;
-               }
-
-               @Override
-               public void run() {
-                       long currentTimeMillis = System.currentTimeMillis();
-                       long lastHeartbeat = getLastHeartbeat();
-
-                       LOG.debug("BOLT - current time : " + currentTimeMillis 
+ ", last heartbeat : " + lastHeartbeat
-                                       + ", worker timeout (ms) : " + 
workerTimeoutMills);
-
-                       if (currentTimeMillis - lastHeartbeat > 
workerTimeoutMills) {
-                               bolt.die(new RuntimeException("subprocess 
heartbeat timeout"));
-                       }
-
-                       String genId = Long.toString(_rand.nextLong());
-                       try {
-                               
_pendingWrites.put(createHeartbeatBoltMessage(genId));
-                       } catch(InterruptedException e) {
-                               String processInfo = 
_process.getProcessInfoString() + _process.getProcessTerminationInfoString();
-                               bolt.die(new RuntimeException("Error during 
multilang processing " + processInfo, e));
-                       }
-               }
-
-               private BoltMsg createHeartbeatBoltMessage(String genId) {
-                       BoltMsg msg = new BoltMsg();
-                       msg.setId(genId);
-                       msg.setTask(Constants.SYSTEM_TASK_ID);
-                       msg.setStream(HEARTBEAT_STREAM_ID);
-                       msg.setTuple(new ArrayList<Object>());
-                       return msg;
-               }
-       }
-
-       private class BoltReaderRunnable implements Runnable {
-               public void run() {
-                       while (_running) {
-                               try {
-                                       ShellMsg shellMsg = 
_process.readShellMsg();
-
-                                       String command = shellMsg.getCommand();
-                                       if (command == null) {
-                                               throw new 
IllegalArgumentException("Command not found in bolt message: " + shellMsg);
-                                       }
-                                       if (command.equals("sync")) {
-                                               setHeartbeat();
-                                       } else if(command.equals("ack")) {
-                                               handleAck(shellMsg.getId());
-                                       } else if (command.equals("fail")) {
-                                               handleFail(shellMsg.getId());
-                                       } else if (command.equals("error")) {
-                                               handleError(shellMsg.getMsg());
-                                       } else if (command.equals("log")) {
-                                               handleLog(shellMsg);
-                                       } else if (command.equals("emit")) {
-                                               handleEmit(shellMsg);
-                                       } else if (command.equals("metrics")) {
-                                               handleMetrics(shellMsg);
-                                       }
-                               } catch (InterruptedException e) {
-                               } catch (Throwable t) {
-                                       die(t);
-                               }
-                       }
-               }
-       }
-
-       private class BoltWriterRunnable implements Runnable {
-               public void run() {
-                       while (_running) {
-                               try {
-                                       Object write = _pendingWrites.poll(1, 
SECONDS);
-                                       if (write instanceof BoltMsg) {
-                                               _process.writeBoltMsg((BoltMsg) 
write);
-                                       } else if (write instanceof List<?>) {
-                                               
_process.writeTaskIds((List<Integer>)write);
-                                       } else if (write != null) {
-                                               throw new 
RuntimeException("Unknown class type to write: " + write.getClass().getName());
-                                       }
-                               } catch (InterruptedException e) {
-                               } catch (Throwable t) {
-                                       die(t);
-                               }
-                       }
-               }
-       }
+    private class BoltHeartbeatTimerTask extends TimerTask {
+        private ShellBolt bolt;
+
+        public BoltHeartbeatTimerTask(ShellBolt bolt) {
+            this.bolt = bolt;
+        }
+
+        @Override
+        public void run() {
+            long currentTimeMillis = System.currentTimeMillis();
+            long lastHeartbeat = getLastHeartbeat();
+
+            LOG.debug("BOLT - current time : " + currentTimeMillis + ", last 
heartbeat : " + lastHeartbeat
+                    + ", worker timeout (ms) : " + workerTimeoutMills);
+
+            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
+                bolt.die(new RuntimeException("subprocess heartbeat timeout"));
+            }
+
+            String genId = Long.toString(_rand.nextLong());
+            try {
+                _pendingWrites.put(createHeartbeatBoltMessage(genId));
+            } catch(InterruptedException e) {
+                String processInfo = _process.getProcessInfoString() + 
_process.getProcessTerminationInfoString();
+                bolt.die(new RuntimeException("Error during multilang 
processing " + processInfo, e));
+            }
+        }
+
+        private BoltMsg createHeartbeatBoltMessage(String genId) {
+            BoltMsg msg = new BoltMsg();
+            msg.setId(genId);
+            msg.setTask(Constants.SYSTEM_TASK_ID);
+            msg.setStream(HEARTBEAT_STREAM_ID);
+            msg.setTuple(new ArrayList<Object>());
+            return msg;
+        }
+    }
+
+    private class BoltReaderRunnable implements Runnable {
+        public void run() {
+            while (_running) {
+                try {
+                    ShellMsg shellMsg = _process.readShellMsg();
+
+                    String command = shellMsg.getCommand();
+                    if (command == null) {
+                        throw new IllegalArgumentException("Command not found 
in bolt message: " + shellMsg);
+                    }
+                    if (command.equals("sync")) {
+                        setHeartbeat();
+                    } else if(command.equals("ack")) {
+                        handleAck(shellMsg.getId());
+                    } else if (command.equals("fail")) {
+                        handleFail(shellMsg.getId());
+                    } else if (command.equals("error")) {
+                        handleError(shellMsg.getMsg());
+                    } else if (command.equals("log")) {
+                        handleLog(shellMsg);
+                    } else if (command.equals("emit")) {
+                        handleEmit(shellMsg);
+                    } else if (command.equals("metrics")) {
+                        handleMetrics(shellMsg);
+                    }
+                } catch (InterruptedException e) {
+                } catch (Throwable t) {
+                    die(t);
+                }
+            }
+        }
+    }
+
+    private class BoltWriterRunnable implements Runnable {
+        public void run() {
+            while (_running) {
+                try {
+                    Object write = _pendingWrites.poll(1, SECONDS);
+                    if (write instanceof BoltMsg) {
+                        _process.writeBoltMsg((BoltMsg) write);
+                    } else if (write instanceof List<?>) {
+                        _process.writeTaskIds((List<Integer>)write);
+                    } else if (write != null) {
+                        throw new RuntimeException("Unknown class type to 
write: " + write.getClass().getName());
+                    }
+                } catch (InterruptedException e) {
+                } catch (Throwable t) {
+                    die(t);
+                }
+            }
+        }
+    }
 }

Reply via email to