Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1074#discussion_r52220553 --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java --- @@ -1370,9 +1469,974 @@ public static int toPositive(int number) { public static RuntimeException wrapInRuntime(Exception e){ if (e instanceof RuntimeException){ return (RuntimeException)e; - }else { + } else { return new RuntimeException(e); } } -} + /** + * Determines if a zip archive contains a particular directory. + * + * @param zipfile path to the zipped file + * @param target directory being looked for in the zip. + * @return boolean whether or not the directory exists in the zip. + */ + public static boolean zipDoesContainDir(String zipfile, String target) throws IOException { + List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new ZipFile(zipfile).entries()); + + if(entries == null) { + return false; + } + + String targetDir = target + "/"; + for(ZipEntry entry : entries) { + String name = entry.getName(); + if(name.startsWith(targetDir)) { + return true; + } + } + + return false; + } + + /** + * Joins any number of maps together into a single map, combining their values into + * a list, maintaining values in the order the maps were passed in. Nulls are inserted + * for given keys when the map does not contain that key. + * + * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 5}) -> + * {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 5]} + * + * @param maps variable number of maps to join - order affects order of values in output. + * @return combined map + */ + public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) { + Map<K, List<V>> ret = new HashMap<>(); + + Set<K> keys = new HashSet<>(); + + for(Map<K, V> map : maps) { + keys.addAll(map.keySet()); + } + + for(Map<K, V> m : maps) { + for(K key : keys) { + V value = m.get(key); + + if(!ret.containsKey(key)) { + ret.put(key, new ArrayList<V>()); + } + + List<V> targetList = ret.get(key); + targetList.add(value); + } + } + return ret; + } + + /** + * Fills up chunks out of a collection (given a maximum amount of chunks) + * + * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]] + * partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]] + * partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]] + * @param maxNumChunks the maximum number of chunks to return + * @param coll the collection to be chunked up + * @return a list of the chunks, which are themselves lists. + */ + public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll) { + List<List<T>> ret = new ArrayList<>(); + + if(maxNumChunks == 0 || coll == null) { + return ret; + } + + Map<Integer, Integer> parts = integerDivided(coll.size(), maxNumChunks); + + // Keys sorted in descending order + List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet()); + Collections.sort(sortedKeys, Collections.reverseOrder()); + + + Iterator<T> it = coll.iterator(); + for(Integer chunkSize : sortedKeys) { + if(!it.hasNext()) { break; } + Integer times = parts.get(chunkSize); + for(int i = 0; i < times; i++) { + if(!it.hasNext()) { break; } + List<T> chunkList = new ArrayList<>(); + for(int j = 0; j < chunkSize; j++) { + if(!it.hasNext()) { break; } + chunkList.add(it.next()); + } + ret.add(chunkList); + } + } + + return ret; + } + + /** + * Return a new instance of a pluggable specified in the conf. + * @param conf The conf to read from. + * @param configKey The key pointing to the pluggable class + * @return an instance of the class or null if it is not specified. + */ + public static Object getConfiguredClass(Map conf, Object configKey) { + if (conf.containsKey(configKey)) { + return newInstance((String)conf.get(configKey)); + } + return null; + } + + public static String logsFilename(String stormId, int port) { + return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + FILE_PATH_SEPARATOR + "worker.log"; + } + + public static String eventLogsFilename(String stormId, int port) { + return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + FILE_PATH_SEPARATOR + "events.log"; + } + + public static Object readYamlFile(String yamlFile) { + try (FileReader reader = new FileReader(yamlFile)) { + return new Yaml(new SafeConstructor()).load(reader); + } + catch(Exception ex) { + LOG.error("Failed to read yaml file.", ex); + } + return null; + } + + public static void setupDefaultUncaughtExceptionHandler() { + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread thread, Throwable thrown) { + try { + handleUncaughtException(thrown); + } + catch (Error err) { + LOG.error("Received error in main thread.. terminating server...", err); + Runtime.getRuntime().exit(-2); + } + } + }); + } + + /** + * Creates a new map with a string value in the map replaced with an + * equivalently-lengthed string of '#'. + * @param m The map that a value will be redacted from + * @param key The key pointing to the value to be redacted + * @return a new map with the value redacted. The original map will not be modified. + */ + public static Map redactValue(Map<Object, String> m, Object key) { + if(m.containsKey(key)) { + HashMap<Object, String> newMap = new HashMap<>(m); + String value = newMap.get(key); + String redacted = new String(new char[value.length()]).replace("\0", "#"); + newMap.put(key, redacted); + return newMap; + } + return m; + } + + public static void logThriftAccess(Integer requestId, InetAddress remoteAddress, Principal principal, String operation) { + new ThriftAccessLogger().log( + String.format("Request ID: {} access from: {} principal: {} operation: {}", + requestId, remoteAddress, principal, operation)); + } + + /** + * Make sure a given key name is valid for the storm config. + * Throw RuntimeException if the key isn't valid. + * @param name The name of the config key to check. + */ + public static void validateKeyName(String name) { + Set<String> disallowedKeys = new HashSet<>(); + disallowedKeys.add("/"); + disallowedKeys.add("."); + disallowedKeys.add(":"); + disallowedKeys.add("\\"); + + for(String key : disallowedKeys) { + if( name.contains(key) ) { + throw new RuntimeException("Key name cannot contain any of the following: " + disallowedKeys.toString()); + } + } + if(name.trim().isEmpty()) { + throw new RuntimeException("Key name cannot be blank"); + } + } + + //Everything from here on is translated from the old util.clj (storm-core/src/clj/backtype.storm/util.clj) + + public static final boolean IS_ON_WINDOWS = "Windows_NT".equals(System.getenv("OS")); + + public static final String FILE_PATH_SEPARATOR = System.getProperty("file.separator"); + + public static final String CLASS_PATH_SEPARATOR = System.getProperty("path.separator"); + + public static final int SIGKILL = 9; + public static final int SIGTERM = 15; + + + + /** + * Find the first item of coll for which pred.test(...) returns true. + * @param pred The IPredicate to test for + * @param coll The Collection of items to search through. + * @return The first matching value in coll, or null if nothing matches. + */ + public static Object findFirst (IPredicate pred, Collection coll) { + if (coll == null || pred == null) { + return null; + } else { + Iterator<Object> iter = coll.iterator(); --- End diff -- Lets not use the iterator directly, and in the old code if pred was null it would NPE. ``` public static <T> T findFirst (IPredicate pred, Iterable<T> coll) { if (coll != null) { for (T obj: coll) { if (pred.test(obj)) { return obj; } } } return null; } ```
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---