Repository: storm
Updated Branches:
  refs/heads/master 6c28dce02 -> 16c3b4f4c


STORM-188. Allow user to specifiy full configuration path when running storm 
command.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a2c4129
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a2c4129
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a2c4129

Branch: refs/heads/master
Commit: 9a2c4129b145b901f320238ea36a282ff7f44d0c
Parents: 330e135
Author: Sriharsha Chintalapani <[email protected]>
Authored: Tue Mar 31 11:37:30 2015 -0700
Committer: Sriharsha Chintalapani <[email protected]>
Committed: Tue Mar 31 11:37:30 2015 -0700

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/utils/Utils.java     | 119 ++++++++++++-------
 1 file changed, 77 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9a2c4129/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java 
b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 4123f73..7e2d97b 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -45,6 +45,8 @@ import java.net.URLDecoder;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
+import java.io.File;
+import java.io.FileInputStream;
 import java.util.*;
 
 public class Utils {
@@ -66,7 +68,7 @@ public class Utils {
             throw new RuntimeException(e);
         }
     }
- 
+
     public static byte[] serialize(Object obj) {
         return serializationDelegate.serialize(obj);
     }
@@ -120,7 +122,7 @@ public class Utils {
             throw new RuntimeException(e);
         }
     }
-    
+
     public static List<URL> findResources(String name) {
         try {
             Enumeration<URL> resources = 
Thread.currentThread().getContextClassLoader().getResources(name);
@@ -135,35 +137,68 @@ public class Utils {
     }
 
     public static Map findAndReadConfigFile(String name, boolean mustExist) {
+        InputStream in = null;
+        Boolean confFileEmpty = false;
         try {
-            HashSet<URL> resources = new HashSet<URL>(findResources(name));
-            if(resources.isEmpty()) {
-                if(mustExist) throw new RuntimeException("Could not find 
config file on classpath " + name);
-                else return new HashMap();
-            }
-            if(resources.size() > 1) {
-                throw new RuntimeException("Found multiple " + name + " 
resources. You're probably bundling the Storm jars with your topology jar. "
-                  + resources);
-            }
-            URL resource = resources.iterator().next();
-            Yaml yaml = new Yaml(new SafeConstructor());
-            Map ret = null;
-            InputStream input = resource.openStream();
-            try {
-                ret = (Map) yaml.load(new InputStreamReader(input));
-            } finally {
-                input.close();
+            in = getConfigFileInputStream(name);
+            if (null != in) {
+                Yaml yaml = new Yaml(new SafeConstructor());
+                Map ret = (Map) yaml.load(new InputStreamReader(in));
+                if (null != ret) {
+                    return new HashMap(ret);
+                } else {
+                    confFileEmpty = true;
+                }
             }
-            if(ret==null) ret = new HashMap();
-            
 
-            return new HashMap(ret);
-            
+            if (mustExist) {
+                if(confFileEmpty)
+                    throw new RuntimeException("Config file " + name + " 
doesn't have any valid storm configs");
+                else
+                    throw new RuntimeException("Could not find config file on 
classpath " + name);
+            } else {
+                return new HashMap();
+            }
         } catch (IOException e) {
             throw new RuntimeException(e);
+        } finally {
+            if (null != in) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
     }
 
+    private static InputStream getConfigFileInputStream(String configFilePath)
+            throws IOException {
+        if (null == configFilePath) {
+            throw new IOException(
+                    "Could not find config file, name not specified");
+        }
+
+        HashSet<URL> resources = new 
HashSet<URL>(findResources(configFilePath));
+        if (resources.isEmpty()) {
+            File configFile = new File(configFilePath);
+            if (configFile.exists()) {
+                return new FileInputStream(configFile);
+            }
+        } else if (resources.size() > 1) {
+            throw new IOException(
+                    "Found multiple " + configFilePath
+                            + " resources. You're probably bundling the Storm 
jars with your topology jar. "
+                            + resources);
+        } else {
+            LOG.info("Using "+configFilePath+" from resources");
+            URL resource = resources.iterator().next();
+            return resource.openStream();
+        }
+        return null;
+    }
+
+
     public static Map findAndReadConfigFile(String name) {
        return findAndReadConfigFile(name, true);
     }
@@ -171,7 +206,7 @@ public class Utils {
     public static Map readDefaultConfig() {
         return findAndReadConfigFile("defaults.yaml", true);
     }
-    
+
     public static Map readCommandLineOpts() {
         Map ret = new HashMap();
         String commandOptions = System.getProperty("storm.options");
@@ -205,7 +240,7 @@ public class Utils {
         ret.putAll(readCommandLineOpts());
         return ret;
     }
-    
+
     private static Object normalizeConf(Object conf) {
         if(conf==null) return new HashMap();
         if(conf instanceof Map) {
@@ -230,7 +265,7 @@ public class Utils {
             return conf;
         }
     }
-    
+
     public static boolean isValidConf(Map<String, Object> stormConf) {
         return normalizeConf(stormConf).equals(normalizeConf((Map) 
JSONValue.parse(JSONValue.toJSONString(stormConf))));
     }
@@ -252,7 +287,7 @@ public class Utils {
         }
         return ret;
     }
-    
+
     public static List<Object> tuple(Object... values) {
         List<Object> ret = new ArrayList<Object>();
         for(Object v: values) {
@@ -272,20 +307,20 @@ public class Utils {
         }
         out.close();
     }
-    
+
     public static IFn loadClojureFn(String namespace, String name) {
         try {
           clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + 
")"));
         } catch (Exception e) {
           //if playing from the repl and defining functions, file won't exist
         }
-        return (IFn) RT.var(namespace, name).deref();        
+        return (IFn) RT.var(namespace, name).deref();
     }
-    
+
     public static boolean isSystemId(String id) {
         return id.startsWith("__");
     }
-        
+
     public static <K, V> Map<V, K> reverseMap(Map<K, V> map) {
         Map<V, K> ret = new HashMap<V, K>();
         for(K key: map.keySet()) {
@@ -293,7 +328,7 @@ public class Utils {
         }
         return ret;
     }
-    
+
     public static ComponentCommon getComponentCommon(StormTopology topology, 
String id) {
         if(topology.get_spouts().containsKey(id)) {
             return topology.get_spouts().get(id).get_common();
@@ -306,7 +341,7 @@ public class Utils {
         }
         throw new IllegalArgumentException("Could not find component with id " 
+ id);
     }
-    
+
     public static Integer getInt(Object o) {
       Integer result = getInt(o, null);
       if (null == result) {
@@ -314,7 +349,7 @@ public class Utils {
       }
       return result;
     }
-    
+
     public static Integer getInt(Object o, Integer defaultValue) {
       if (null == o) {
         return defaultValue;
@@ -340,18 +375,18 @@ public class Utils {
       if (null == o) {
         return defaultValue;
       }
-      
+
       if(o instanceof Boolean) {
           return (Boolean) o;
       } else {
           throw new IllegalArgumentException("Don't know how to convert " + o 
+ " + to boolean");
       }
     }
-    
+
     public static long secureRandomLong() {
         return UUID.randomUUID().getLeastSignificantBits();
     }
-    
+
     public static CuratorFramework newCurator(Map conf, List<String> servers, 
Object port, String root) {
         return newCurator(conf, servers, port, root, null);
     }
@@ -365,7 +400,7 @@ public class Utils {
         CuratorFrameworkFactory.Builder builder = 
CuratorFrameworkFactory.builder();
 
         setupBuilder(builder, zkStr, conf, auth);
-        
+
         return builder.build();
     }
 
@@ -398,8 +433,8 @@ public class Utils {
         CuratorFramework ret = newCurator(conf, servers, port, auth);
         ret.start();
         return ret;
-    }    
-    
+    }
+
     /**
      *
 (defn integer-divided [sum num-pieces]
@@ -412,9 +447,9 @@ public class Utils {
       )))
      * @param sum
      * @param numPieces
-     * @return 
+     * @return
      */
-    
+
     public static TreeMap<Integer, Integer> integerDivided(int sum, int 
numPieces) {
         int base = sum / numPieces;
         int numInc = sum % numPieces;

Reply via email to