Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1074#discussion_r52217262
  
    --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -1370,9 +1469,974 @@ public static int toPositive(int number) {
         public static RuntimeException wrapInRuntime(Exception e){
             if (e instanceof RuntimeException){
                 return (RuntimeException)e;
    -        }else {
    +        } else {
                 return new RuntimeException(e);
             }
         }
    -}
     
    +    /**
    +     * Determines if a zip archive contains a particular directory.
    +     *
    +     * @param zipfile path to the zipped file
    +     * @param target directory being looked for in the zip.
    +     * @return boolean whether or not the directory exists in the zip.
    +     */
    +    public static boolean zipDoesContainDir(String zipfile, String target) 
throws IOException {
    +        List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new 
ZipFile(zipfile).entries());
    +
    +        if(entries == null) {
    +            return false;
    +        }
    +
    +        String targetDir = target + "/";
    +        for(ZipEntry entry : entries) {
    +            String name = entry.getName();
    +            if(name.startsWith(targetDir)) {
    +                return true;
    +            }
    +        }
    +
    +        return false;
    +    }
    +
    +    /**
    +     * Joins any number of maps together into a single map, combining 
their values into
    +     * a list, maintaining values in the order the maps were passed in. 
Nulls are inserted
    +     * for given keys when the map does not contain that key.
    +     *
    +     * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 
5}) ->
    +     *      {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 
5]}
    +     *
    +     * @param maps variable number of maps to join - order affects order 
of values in output.
    +     * @return combined map
    +     */
    +    public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
    +        Map<K, List<V>> ret = new HashMap<>();
    +
    +        Set<K> keys = new HashSet<>();
    +
    +        for(Map<K, V> map : maps) {
    +            keys.addAll(map.keySet());
    +        }
    +
    +        for(Map<K, V> m : maps) {
    +            for(K key : keys) {
    +                V value = m.get(key);
    +
    +                if(!ret.containsKey(key)) {
    +                    ret.put(key, new ArrayList<V>());
    +                }
    +
    +                List<V> targetList = ret.get(key);
    +                targetList.add(value);
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Fills up chunks out of a collection (given a maximum amount of 
chunks)
    +     *
    +     * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
    +     *      partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
    +     *      partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
    +     * @param maxNumChunks the maximum number of chunks to return
    +     * @param coll the collection to be chunked up
    +     * @return a list of the chunks, which are themselves lists.
    +     */
    +    public static <T> List<List<T>> partitionFixed(int maxNumChunks, 
Collection<T> coll) {
    +        List<List<T>> ret = new ArrayList<>();
    +
    +        if(maxNumChunks == 0 || coll == null) {
    +            return ret;
    +        }
    +
    +        Map<Integer, Integer> parts = integerDivided(coll.size(), 
maxNumChunks);
    +
    +        // Keys sorted in descending order
    +        List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
    +        Collections.sort(sortedKeys, Collections.reverseOrder());
    +
    +
    +        Iterator<T> it = coll.iterator();
    +        for(Integer chunkSize : sortedKeys) {
    +            if(!it.hasNext()) { break; }
    +            Integer times = parts.get(chunkSize);
    +            for(int i = 0; i < times; i++) {
    +                if(!it.hasNext()) { break; }
    +                List<T> chunkList = new ArrayList<>();
    +                for(int j = 0; j < chunkSize; j++) {
    +                    if(!it.hasNext()) { break; }
    +                    chunkList.add(it.next());
    +                }
    +                ret.add(chunkList);
    +            }
    +        }
    +
    +        return ret;
    +    }
    +
    +    /**
    +     * Return a new instance of a pluggable specified in the conf.
    +     * @param conf The conf to read from.
    +     * @param configKey The key pointing to the pluggable class
    +     * @return an instance of the class or null if it is not specified.
    +     */
    +    public static Object getConfiguredClass(Map conf, Object configKey) {
    +        if (conf.containsKey(configKey)) {
    +            return newInstance((String)conf.get(configKey));
    +        }
    +        return null;
    +    }
    +
    +    public static String logsFilename(String stormId, int port) {
    +        return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + 
FILE_PATH_SEPARATOR + "worker.log";
    +    }
    +
    +    public static String eventLogsFilename(String stormId, int port) {
    +        return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + 
FILE_PATH_SEPARATOR + "events.log";
    +    }
    +
    +    public static Object readYamlFile(String yamlFile) {
    +        try (FileReader reader = new FileReader(yamlFile)) {
    +            return new Yaml(new SafeConstructor()).load(reader);
    +        }
    +        catch(Exception ex) {
    +            LOG.error("Failed to read yaml file.", ex);
    +        }
    +        return null;
    +    }
    +
    +    public static void setupDefaultUncaughtExceptionHandler() {
    +        Thread.setDefaultUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
    +                public void uncaughtException(Thread thread, Throwable 
thrown) {
    +                    try {
    +                        handleUncaughtException(thrown);
    +                    }
    +                    catch (Error err) {
    +                        LOG.error("Received error in main thread.. 
terminating server...", err);
    +                        Runtime.getRuntime().exit(-2);
    +                    }
    +                }
    +            });
    +    }
    +
    +    /**
    +     * Creates a new map with a string value in the map replaced with an 
    +     * equivalently-lengthed string of '#'.
    +     * @param m The map that a value will be redacted from
    +     * @param key The key pointing to the value to be redacted
    +     * @return a new map with the value redacted. The original map will 
not be modified.
    +     */
    +    public static Map redactValue(Map<Object, String> m, Object key) { 
    +        if(m.containsKey(key)) {
    +            HashMap<Object, String> newMap = new HashMap<>(m);
    +            String value = newMap.get(key);
    +            String redacted = new String(new 
char[value.length()]).replace("\0", "#");
    +            newMap.put(key, redacted);
    +            return newMap;
    +        }
    +        return m;
    +    }
    +
    +    public static void logThriftAccess(Integer requestId, InetAddress 
remoteAddress, Principal principal, String operation) {
    +        new ThriftAccessLogger().log(
    +            String.format("Request ID: {} access from: {} principal: {} 
operation: {}",
    +                          requestId, remoteAddress, principal, operation));
    +    }
    +
    +    /**
    +     * Make sure a given key name is valid for the storm config. 
    +     * Throw RuntimeException if the key isn't valid.
    +     * @param name The name of the config key to check.
    +     */
    +    public static void validateKeyName(String name) {
    +        Set<String> disallowedKeys = new HashSet<>();
    +        disallowedKeys.add("/");
    +        disallowedKeys.add(".");
    +        disallowedKeys.add(":");
    +        disallowedKeys.add("\\");
    +
    +        for(String key : disallowedKeys) {
    +            if( name.contains(key) ) {
    +                throw new RuntimeException("Key name cannot contain any of 
the following: " + disallowedKeys.toString());
    +            }
    +        }
    +        if(name.trim().isEmpty()) {
    +            throw new RuntimeException("Key name cannot be blank");
    +        }
    +    }
    +
    +    //Everything from here on is translated from the old util.clj 
(storm-core/src/clj/backtype.storm/util.clj)
    +
    +    public static final boolean IS_ON_WINDOWS = 
"Windows_NT".equals(System.getenv("OS"));
    +
    +    public static final String FILE_PATH_SEPARATOR = 
System.getProperty("file.separator");
    +
    +    public static final String CLASS_PATH_SEPARATOR = 
System.getProperty("path.separator");
    +
    +    public static final int SIGKILL = 9;
    +    public static final int SIGTERM = 15;
    +
    +
    +
    +    /**
    +     * Find the first item of coll for which pred.test(...) returns true.
    +     * @param pred The IPredicate to test for
    +     * @param coll The Collection of items to search through.
    +     * @return The first matching value in coll, or null if nothing 
matches.
    +     */
    +    public static Object findFirst (IPredicate pred, Collection coll) {
    +        if (coll == null || pred == null) {
    +            return null;
    +        } else {
    +            Iterator<Object> iter = coll.iterator();
    +            while(iter != null && iter.hasNext()) {
    +                Object obj = iter.next();
    +                if (pred.test(obj)) {
    +                    return obj;
    +                }
    +            }
    +            return null;
    +        }
    +    }
    +
    +    public static Object findFirst (IPredicate pred, Map map) {
    +        if (map == null || pred == null) {
    +            return null;
    +        } else {
    +            Iterator<Object> iter = map.entrySet().iterator();
    +            while(iter != null && iter.hasNext()) {
    +                Object obj = iter.next();
    +                if (pred.test(obj)) {
    +                    return obj;
    +                }
    +            }
    +            return null;
    +        }
    +    }
    +
    +    public static String localHostname () throws UnknownHostException {
    +        return _instance.localHostnameImpl();
    +    }
    +
    +    // Non-static impl methods exist for mocking purposes.
    +    protected String localHostnameImpl () throws UnknownHostException {
    +        return InetAddress.getLocalHost().getCanonicalHostName();
    +    }
    +
    +    private static String memoizedLocalHostnameString = null;
    +
    +    public static String memoizedLocalHostname () throws 
UnknownHostException {
    +        if (memoizedLocalHostnameString == null) {
    +            memoizedLocalHostnameString = localHostname();
    +        }
    +        return memoizedLocalHostnameString;
    +    }
    +
    +    /**
    +     * Gets the storm.local.hostname value, or tries to figure out the 
local hostname
    +     * if it is not set in the config.
    +     * @param conf The storm config to read from
    +     * @return a string representation of the hostname.
    +    */
    +    public static String hostname (Map<String, Object> conf) throws 
UnknownHostException  {
    +        if (conf == null) {
    +            return memoizedLocalHostname();
    +        }
    +        Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
    +        if (hostnameString == null ) {
    +            return memoizedLocalHostname();
    +        }
    +        if (hostnameString.equals("")) {
    +            return memoizedLocalHostname();
    +        }
    +        return hostnameString.toString();
    +    }
    +
    +    public static String uuid() {
    +        return UUID.randomUUID().toString();
    +    }
    +
    +    public static long secsToMillisLong(double secs) {
    +        return (long) (1000 * secs);
    +    }
    +
    +    public static Vector<String> tokenizePath (String path) {
    +        String[] tokens = path.split("/");
    +        Vector<String> outputs = new Vector<String>();
    +        if (tokens == null || tokens.length == 0) {
    +            return null;
    +        }
    +        for (String tok: tokens) {
    +            if (!tok.isEmpty()) {
    +                outputs.add(tok);
    +            }
    +        }
    +        return outputs;
    +    }
    +
    +    public static String parentPath(String path) {
    +        if (path == null) {
    +            return "/";
    +        }
    +        Vector<String> tokens = tokenizePath(path);
    +        int length = tokens.size();
    +        if (length == 0) {
    +            return "/";
    +        }
    +        String output = "";
    +        for (int i = 0; i < length - 1; i++) {  //length - 1 to mimic 
"butlast" from the old clojure code
    +            output = output + "/" + tokens.get(i);
    +        }
    +        return output;
    +    }
    +
    +    public static String toksToPath (Vector<String> toks) {
    +        if (toks == null || toks.size() == 0) {
    +            return "/";
    +        }
    +
    +        String output = "";
    +        for (int i = 0; i < toks.size(); i++) {
    +            output = output + "/" + toks.get(i);
    +        }
    +        return output;
    +    }
    +    public static String normalizePath (String path) {
    +        return toksToPath(tokenizePath(path));
    +    }
    +
    +    public static void exitProcess (int val, Object... msg) {
    +        StringBuilder errorMessage = new StringBuilder();
    +        errorMessage.append("halting process: ");
    +        for (Object oneMessage: msg) {
    +            errorMessage.append(oneMessage);
    +        }
    +        String combinedErrorMessage = errorMessage.toString();
    +        LOG.error(combinedErrorMessage, new 
RuntimeException(combinedErrorMessage));
    +        Runtime.getRuntime().exit(val);
    +    }
    +
    +    public static Object defaulted(Object val, Object defaultObj) {
    +        if (val != null) {
    +            return val;
    +        } else {
    +            return defaultObj;
    +        }
    +    }
    +
    +    /**
    +     * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
    +     *
    +     * Example usage in java:
    +     *  Map<Integer, String> tasks;
    +     *  Map<String, List<Integer>> componentTasks = 
Utils.reverse_map(tasks);
    +     *
    +     * @param map
    +     * @return
    +     */
    +    public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
    +        HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
    +        if (map == null) {
    +            return rtn;
    +        }
    +        for (Entry<K, V> entry : map.entrySet()) {
    +            K key = entry.getKey();
    +            V val = entry.getValue();
    +            List<K> list = rtn.get(val);
    +            if (list == null) {
    +                list = new ArrayList<K>();
    +                rtn.put(entry.getValue(), list);
    +            }
    +            list.add(key);
    +        }
    +        return rtn;
    +    }
    +
    +    /**
    +     * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
    +     *
    +     */
    +    public static HashMap reverseMap(List listSeq) {
    +        HashMap<Object, List<Object>> rtn = new HashMap();
    +        if (listSeq == null) {
    +            return rtn;
    +        }
    +        for (Object entry : listSeq) {
    +            List listEntry = (List) entry;
    +            Object key = listEntry.get(0);
    +            Object val = listEntry.get(1);
    +            List list = rtn.get(val);
    +            if (list == null) {
    +                list = new ArrayList<Object>();
    +                rtn.put(val, list);
    +            }
    +            list.add(key);
    +        }
    +        return rtn;
    +    }
    +
    +
    +    /**
    +     * Gets the pid of this JVM, because Java doesn't provide a real way 
to do this.
    +     *
    +     * @return
    +     */
    +    public static String processPid() throws RuntimeException {
    +        String name = ManagementFactory.getRuntimeMXBean().getName();
    +        String[] split = name.split("@");
    +        if (split.length != 2) {
    +            throw new RuntimeException("Got unexpected process name: " + 
name);
    +        }
    +        return split[0];
    +    }
    +
    +    public static int execCommand(String command) throws ExecuteException, 
IOException {
    +        String[] cmdlist = command.split(" ");
    +        CommandLine cmd = new CommandLine(cmdlist[0]);
    +        for (int i = 1; i < cmdlist.length; i++) {
    +            cmd.addArgument(cmdlist[i]);
    +        }
    +
    +        DefaultExecutor exec = new DefaultExecutor();
    +        return exec.execute(cmd);
    +    }
    +
    +    /**
    +     * Extra dir from the jar to destdir
    +     *
    +     * @param jarpath
    +     * @param dir
    +     * @param destdir
    +     *
    +    (with-open [jarpath (ZipFile. jarpath)]
    --- End diff --
    
    this should be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to