Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch f7b8bb259 -> 9b6d15735


STORM-2862: Move multilang logging to a shared class and make this class 
configurable.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc65d3bb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc65d3bb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc65d3bb

Branch: refs/heads/1.1.x-branch
Commit: dc65d3bb9dceab31191ab49e365bd25b17f46377
Parents: f7b8bb2
Author: Heather McCartney <[email protected]>
Authored: Wed Dec 20 10:14:53 2017 +0000
Committer: Heather McCartney <[email protected]>
Committed: Mon Jan 22 09:20:45 2018 +0000

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/Config.java |   8 ++
 .../jvm/org/apache/storm/spout/ShellSpout.java  |  37 ++----
 .../jvm/org/apache/storm/task/ShellBolt.java    |  38 ++-----
 .../storm/utils/DefaultShellLogHandler.java     | 113 +++++++++++++++++++
 .../org/apache/storm/utils/ShellLogHandler.java |  52 +++++++++
 .../jvm/org/apache/storm/utils/ShellUtils.java  |  17 +++
 .../storm/utils/DefaultShellLogHandlerTest.java | 105 +++++++++++++++++
 .../org/apache/storm/utils/ShellUtilsTest.java  | 103 +++++++++++++++++
 8 files changed, 413 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 4f3bd1c..ba543be 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1604,6 +1604,14 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_DEBUG = "topology.debug";
 
     /**
+     * The fully qualified name of a {@link ShellLogHandler} to handle output
+     * from non-JVM processes e.g. "com.mycompany.CustomShellLogHandler". If
+     * not provided, org.apache.storm.utils.DefaultLogHandler will be used.
+     */
+    @isString
+    public static final String TOPOLOGY_MULTILANG_LOG_HANDLER = 
"topology.multilang.log.handler";
+
+    /**
      * The serializer for communication between shell components and non-JVM
      * processes
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java 
b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java
index a5ec72b..afd816b 100644
--- a/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/spout/ShellSpout.java
@@ -24,7 +24,9 @@ import org.apache.storm.metric.api.rpc.IShellMetric;
 import org.apache.storm.multilang.ShellMsg;
 import org.apache.storm.multilang.SpoutMsg;
 import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.ShellLogHandler;
 import org.apache.storm.utils.ShellProcess;
+import org.apache.storm.utils.ShellUtils;
 
 import java.util.Arrays;
 import java.util.Map;
@@ -50,6 +52,7 @@ public class ShellSpout implements ISpout {
     private SpoutOutputCollector _collector;
     private String[] _command;
     private Map<String, String> env = new HashMap<>();
+    private ShellLogHandler _logHandler;
     private ShellProcess _process;
     private volatile boolean _running = true;
     private volatile RuntimeException _exception;
@@ -111,6 +114,9 @@ public class ShellSpout implements ISpout {
         Number subpid = _process.launch(stormConf, context, changeDirectory);
         LOG.info("Launched subprocess with pid " + subpid);
 
+        _logHandler =  ShellUtils.getLogHandler(stormConf);
+        _logHandler.setUpContext(ShellSpout.class, _process, _context);
+
         heartBeatExecutorService = 
MoreExecutors.getExitingScheduledExecutorService(new 
ScheduledThreadPoolExecutor(1));
     }
 
@@ -145,7 +151,6 @@ public class ShellSpout implements ISpout {
         querySubprocess();
     }
 
-    
     private void handleMetrics(ShellMsg shellMsg) {
         //get metric name
         String name = shellMsg.getMetricName();
@@ -191,7 +196,7 @@ public class ShellSpout implements ISpout {
                 if (command.equals("sync")) {
                     return;
                 } else if (command.equals("log")) {
-                    handleLog(shellMsg);
+                    _logHandler.log(shellMsg);
                 } else if (command.equals("error")) {
                     handleError(shellMsg.getMsg());
                 } else if (command.equals("emit")) {
@@ -221,34 +226,6 @@ public class ShellSpout implements ISpout {
         }
     }
 
-
-    private void handleLog(ShellMsg shellMsg) {
-        String msg = shellMsg.getMsg();
-        msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
-        ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
-
-        switch (logLevel) {
-            case TRACE:
-                LOG.trace(msg);
-                break;
-            case DEBUG:
-                LOG.debug(msg);
-                break;
-            case INFO:
-                LOG.info(msg);
-                break;
-            case WARN:
-                LOG.warn(msg);
-                break;
-            case ERROR:
-                LOG.error(msg);
-                break;
-            default:
-                LOG.info(msg);
-                break;
-        }
-    }
-
     private void handleError(String msg) {
         _collector.reportError(new Exception("Shell Process Exception: " + 
msg));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java 
b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
index 3d9f141..d520d07 100644
--- a/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/task/ShellBolt.java
@@ -24,10 +24,12 @@ import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.metric.api.rpc.IShellMetric;
 import org.apache.storm.multilang.BoltMsg;
 import org.apache.storm.multilang.ShellMsg;
-import org.apache.storm.topology.ReportedFailedException;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.ShellBoltMessageQueue;
+import org.apache.storm.utils.ShellLogHandler;
 import org.apache.storm.utils.ShellProcess;
+import org.apache.storm.utils.ShellUtils;
+
 import clojure.lang.RT;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
@@ -77,6 +79,7 @@ public class ShellBolt implements IBolt {
 
     private String[] _command;
     private Map<String, String> env = new HashMap<>();
+    private ShellLogHandler _logHandler;
     private ShellProcess _process;
     private volatile boolean _running = true;
     private volatile Throwable _exception;
@@ -150,6 +153,9 @@ public class ShellBolt implements IBolt {
         Number subpid = _process.launch(stormConf, context, changeDirectory);
         LOG.info("Launched subprocess with pid " + subpid);
 
+        _logHandler = ShellUtils.getLogHandler(stormConf);
+        _logHandler.setUpContext(ShellBolt.class, _process, _context);
+
         // reader
         _readerThread = new Thread(new BoltReaderRunnable());
         _readerThread.start();
@@ -245,34 +251,6 @@ public class ShellBolt implements IBolt {
         }
     }
 
-    private void handleLog(ShellMsg shellMsg) {
-        String msg = shellMsg.getMsg();
-        msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
-        ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
-
-        switch (logLevel) {
-            case TRACE:
-                LOG.trace(msg);
-                break;
-            case DEBUG:
-                LOG.debug(msg);
-                break;
-            case INFO:
-                LOG.info(msg);
-                break;
-            case WARN:
-                LOG.warn(msg);
-                break;
-            case ERROR:
-                LOG.error(msg);
-                _collector.reportError(new ReportedFailedException(msg));
-                break;
-            default:
-                LOG.info(msg);
-                break;
-        }
-    }
-
     private void handleMetrics(ShellMsg shellMsg) {
         //get metric name
         String name = shellMsg.getMetricName();
@@ -370,7 +348,7 @@ public class ShellBolt implements IBolt {
                             handleError(shellMsg.getMsg());
                             break;
                         case "log":
-                            handleLog(shellMsg);
+                            _logHandler.log(shellMsg);
                             break;
                         case "emit":
                             handleEmit(shellMsg);

http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java 
b/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java
new file mode 100644
index 0000000..2004bc1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/utils/DefaultShellLogHandler.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.multilang.ShellMsg;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link ShellLogHandler}.
+ */
+public class DefaultShellLogHandler implements ShellLogHandler {
+
+    private Logger log;
+
+    /**
+     * Save information about the current process.
+     */
+    private ShellProcess process;
+
+    /**
+     * Default constructor; used when loading with
+     * Class.forName(...).newInstance().
+     */
+    public DefaultShellLogHandler() {
+    }
+
+    private Logger getLogger(final Class<?> ownerCls) {
+        return LoggerFactory.getLogger(
+                ownerCls == null ? DefaultShellLogHandler.class : ownerCls);
+    }
+
+    /**
+     * This default implementation saves the {@link ShellProcess} so it can
+     * output the process info string later.
+     * @see {@link ShellLogHandler#setUpContext}
+     *
+     * @param ownerCls
+     *            - the class which instantiated this ShellLogHandler.
+     * @param process
+     *            - the current {@link ShellProcess}.
+     * @param context
+     *            - the current {@link TopologyContext}.
+     */
+    public void setUpContext(final Class<?> ownerCls, final ShellProcess 
process,
+            final TopologyContext context) {
+        this.log = getLogger(ownerCls);
+        this.process = process;
+        // context is not used by the default implementation, but is included
+        // in the interface in case it is useful to subclasses
+    }
+
+    /**
+     * Log the given message.
+     * @see {@link ShellLogHandler#log}
+     *
+     * @param shellMsg
+     *            - the {@link ShellMsg} to log.
+     */
+    public void log(final ShellMsg shellMsg) {
+        if (shellMsg == null) {
+            throw new IllegalArgumentException("shellMsg is required");
+        }
+        String msg = shellMsg.getMsg();
+        if (this.log == null) {
+            this.log = getLogger(null);
+        }
+        if (this.process == null) {
+            msg = "ShellLog " + msg;
+        } else {
+            msg = "ShellLog " + process.getProcessInfoString() + " " + msg;
+        }
+        ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
+
+        switch (logLevel) {
+            case TRACE:
+                log.trace(msg);
+                break;
+            case DEBUG:
+                log.debug(msg);
+                break;
+            case INFO:
+                log.info(msg);
+                break;
+            case WARN:
+                log.warn(msg);
+                break;
+            case ERROR:
+                log.error(msg);
+                break;
+            default:
+                log.info(msg);
+                break;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java 
b/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java
new file mode 100644
index 0000000..8463a9e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/utils/ShellLogHandler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.utils;
+
+import org.apache.storm.multilang.ShellMsg;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Handle logging from non-JVM processes.
+ */
+public interface ShellLogHandler {
+
+    /**
+     * Called at least once before {@link ShellLogHandler#log} for each
+     * spout and bolt. Allows implementing classes to save information about
+     * the current running context e.g. pid, thread, task.
+     *
+     * @param ownerCls
+     *            - the class which instantiated this ShellLogHandler.
+     * @param process
+     *            - the current {@link ShellProcess}.
+     * @param context
+     *            - the current {@link TopologyContext}.
+     */
+    void setUpContext(Class<?> ownerCls, ShellProcess process,
+            TopologyContext context);
+
+    /**
+     * Called by spouts and bolts when they receive a 'log' command from a
+     * multilang process.
+     *
+     * @param msg
+     *            - the {@link ShellMsg} containing the message to log.
+     */
+    void log(ShellMsg msg);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java 
b/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java
index ef869b0..3b0f934 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/ShellUtils.java
@@ -28,6 +28,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.storm.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -502,4 +503,20 @@ abstract public class ShellUtils {
         }
     }
 
+    public static ShellLogHandler getLogHandler(Map stormConf) {
+        if (stormConf == null) {
+            throw new IllegalArgumentException("Config is required");
+        }
+
+        String logHandlerClassName = null;
+        if (stormConf.containsKey(Config.TOPOLOGY_MULTILANG_LOG_HANDLER)) {
+            try {
+                logHandlerClassName = 
stormConf.get(Config.TOPOLOGY_MULTILANG_LOG_HANDLER).toString();
+                return (ShellLogHandler) 
Class.forName(logHandlerClassName).newInstance();
+            } catch (ClassCastException | InstantiationException | 
IllegalAccessException | ClassNotFoundException e) {
+                throw new RuntimeException("Error loading ShellLogHandler " + 
logHandlerClassName, e);
+            }
+        }
+        return new DefaultShellLogHandler();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java 
b/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java
new file mode 100644
index 0000000..1e2c70d
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/utils/DefaultShellLogHandlerTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.storm.multilang.ShellMsg;
+import org.apache.storm.multilang.ShellMsg.ShellLogLevel;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultShellLogHandlerTest {
+
+    private DefaultShellLogHandler logHandler;
+
+    @Before
+    public void setUp() {
+        logHandler = new DefaultShellLogHandler();
+    }
+
+    private ShellMsg mockMsg() {
+        ShellMsg shellMsg = mock(ShellMsg.class);
+        when(shellMsg.getMsg()).thenReturn("msg");
+        when(shellMsg.getLogLevel()).thenReturn(ShellLogLevel.INFO);
+        return shellMsg;
+    }
+
+    private ShellProcess mockProcess() {
+        ShellProcess process = mock(ShellProcess.class);
+        when(process.getProcessInfoString()).thenReturn("info");
+        return process;
+    }
+
+    /**
+     * It's fine to pass only null arguments to setUpContext.
+     */
+    @Test
+    public void setUpContext_allNull() {
+        ShellMsg msg = mockMsg();
+        logHandler.setUpContext(null, null, null);
+        logHandler.log(msg);
+        verify(msg).getMsg();
+    }
+
+    /**
+     * Calling setUpContext is optional.
+     */
+    @Test
+    public void setUpContext_optional() {
+        ShellMsg msg = mockMsg();
+        logHandler.log(msg);
+        verify(msg).getMsg();
+    }
+
+    /**
+     * A null {@link ShellMsg} will throw IllegalArgumentException.
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void handleLog_nullShellMsg() {
+        logHandler.log(null);
+    }
+
+    /**
+     * A null {@link ShellProcess} will not throw an exception.
+     */
+    @Test
+    public void handleLog_nullProcess() {
+        ShellMsg msg = mockMsg();
+        ShellProcess process = mockProcess();
+        logHandler.setUpContext(DefaultShellLogHandlerTest.class, process, 
null);
+        logHandler.log(msg);
+        verify(msg).getMsg();
+    }
+
+    /**
+     * If both {@link ShellMsg} and {@link ShellProcess} are provided, both
+     * will be used to build the log message.
+     */
+    @Test
+    public void handleLog_valid() {
+        ShellMsg msg = mockMsg();
+        ShellProcess process = mockProcess();
+        logHandler.setUpContext(DefaultShellLogHandlerTest.class, process, 
null);
+        logHandler.log(msg);
+        verify(msg).getMsg();
+        verify(process).getProcessInfoString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/dc65d3bb/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java 
b/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java
new file mode 100644
index 0000000..087b33b
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/utils/ShellUtilsTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.utils;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.multilang.ShellMsg;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+
+public class ShellUtilsTest {
+
+    public static class CustomShellLogHandler implements ShellLogHandler {
+        @Override
+        public void setUpContext(Class<?> owner, ShellProcess process,
+                TopologyContext context) {
+        }
+
+        @Override
+        public void log(ShellMsg msg) {    
+        }
+    }
+
+    private Map<String, Object> configureLogHandler(String className) {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(Config.TOPOLOGY_MULTILANG_LOG_HANDLER, className);
+        return conf;
+    }
+
+    /**
+     * A null config will throw IllegalArgumentException.
+     */
+    @Test(expected = IllegalArgumentException.class)
+    public void getLogHandler_nullConf() {
+        ShellUtils.getLogHandler(null);
+    }
+
+    /**
+     * If a log handler is not configured, {@link DefaultShellLogHandler}
+     * will be returned.
+     */
+    @Test
+    public void getLogHandler_notConfigured() {
+        ShellLogHandler logHandler = ShellUtils.getLogHandler(new 
HashMap<String, Object>());
+        assertTrue(logHandler.getClass() == DefaultShellLogHandler.class);
+    }
+
+    /**
+     * If a log handler cannot be found, a {@link RuntimeException} will be
+     * thrown with {@link ClassNotFoundException} as the cause.
+     */
+    @Test
+    public void getLogHandler_notFound() {
+        try {
+            configureLogHandler("class.not.Found");
+        } catch (RuntimeException e) {
+            assert(e.getCause().getClass() == ClassNotFoundException.class);
+        }
+    }
+
+    /**
+     * If a log handler is not an instance of {@link ShellLogHandler}, a
+     * {@link RuntimeException} will be thrown with {@link ClassCastException}
+     * as the cause.
+     */
+    @Test
+    public void getLogHandler_notAShellLogHandler() {
+        try {
+            configureLogHandler("java.lang.String");
+        } catch (RuntimeException e) {
+            assert(e.getCause().getClass() == ClassCastException.class);
+        }
+    }
+
+    /**
+     * If a log handler is correctly configured, it will be returned.
+     */
+    @Test
+    public void getLogHandler_customHandler() {
+        Map<String, Object> conf = 
configureLogHandler("org.apache.storm.utils.ShellUtilsTest$CustomShellLogHandler");
+        ShellLogHandler logHandler = ShellUtils.getLogHandler(conf);
+        assertTrue(logHandler.getClass() == CustomShellLogHandler.class);
+    }
+}
\ No newline at end of file

Reply via email to