[
https://issues.apache.org/jira/browse/STORM-1226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139644#comment-15139644
]
ASF GitHub Bot commented on STORM-1226:
---------------------------------------
Github user abhishekagarwal87 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1074#discussion_r52364169
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1370,9 +1444,821 @@ public static int toPositive(int number) {
public static RuntimeException wrapInRuntime(Exception e){
if (e instanceof RuntimeException){
return (RuntimeException)e;
- }else {
+ } else {
return new RuntimeException(e);
}
}
-}
+ /**
+ * Determines if a zip archive contains a particular directory.
+ *
+ * @param zipfile path to the zipped file
+ * @param target directory being looked for in the zip.
+ * @return boolean whether or not the directory exists in the zip.
+ */
+ public static boolean zipDoesContainDir(String zipfile, String target)
throws IOException {
+ List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new
ZipFile(zipfile).entries());
+
+ String targetDir = target + "/";
+ for(ZipEntry entry : entries) {
+ String name = entry.getName();
+ if(name.startsWith(targetDir)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Joins any number of maps together into a single map, combining
their values into
+ * a list, maintaining values in the order the maps were passed in.
Nulls are inserted
+ * for given keys when the map does not contain that key.
+ *
+ * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' =>
5}) ->
+ * {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null,
5]}
+ *
+ * @param maps variable number of maps to join - order affects order
of values in output.
+ * @return combined map
+ */
+ public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
+ Map<K, List<V>> ret = new HashMap<>();
+
+ Set<K> keys = new HashSet<>();
+
+ for(Map<K, V> map : maps) {
+ keys.addAll(map.keySet());
+ }
+
+ for(Map<K, V> m : maps) {
+ for(K key : keys) {
+ V value = m.get(key);
+
+ if(!ret.containsKey(key)) {
+ ret.put(key, new ArrayList<V>());
+ }
+
+ List<V> targetList = ret.get(key);
+ targetList.add(value);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Fills up chunks out of a collection (given a maximum amount of
chunks)
+ *
+ * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
+ * partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
+ * partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
+ * @param maxNumChunks the maximum number of chunks to return
+ * @param coll the collection to be chunked up
+ * @return a list of the chunks, which are themselves lists.
+ */
+ public static <T> List<List<T>> partitionFixed(int maxNumChunks,
Collection<T> coll) {
+ List<List<T>> ret = new ArrayList<>();
+
+ if(maxNumChunks == 0 || coll == null) {
+ return ret;
+ }
+
+ Map<Integer, Integer> parts = integerDivided(coll.size(),
maxNumChunks);
+
+ // Keys sorted in descending order
+ List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
+ Collections.sort(sortedKeys, Collections.reverseOrder());
+
+
+ Iterator<T> it = coll.iterator();
+ for(Integer chunkSize : sortedKeys) {
+ if(!it.hasNext()) { break; }
+ Integer times = parts.get(chunkSize);
+ for(int i = 0; i < times; i++) {
+ if(!it.hasNext()) { break; }
+ List<T> chunkList = new ArrayList<>();
+ for(int j = 0; j < chunkSize; j++) {
+ if(!it.hasNext()) { break; }
+ chunkList.add(it.next());
+ }
+ ret.add(chunkList);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Return a new instance of a pluggable specified in the conf.
+ * @param conf The conf to read from.
+ * @param configKey The key pointing to the pluggable class
+ * @return an instance of the class or null if it is not specified.
+ */
+ public static Object getConfiguredClass(Map conf, Object configKey) {
+ if (conf.containsKey(configKey)) {
+ return newInstance((String)conf.get(configKey));
+ }
+ return null;
+ }
+
+ public static String logsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR
+ "worker.log";
+ }
+
+ public static String eventLogsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR
+ "events.log";
+ }
+
+ public static Object readYamlFile(String yamlFile) {
+ try (FileReader reader = new FileReader(yamlFile)) {
+ return new Yaml(new SafeConstructor()).load(reader);
+ } catch(Exception ex) {
+ LOG.error("Failed to read yaml file.", ex);
+ }
+ return null;
+ }
+
+ public static void setupDefaultUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread thread, Throwable
thrown) {
+ try {
+ handleUncaughtException(thrown);
+ } catch (Error err) {
+ LOG.error("Received error in main thread..
terminating server...", err);
+ Runtime.getRuntime().exit(-2);
+ }
+ }
+ });
+ }
+
+ /**
+ * Creates a new map with a string value in the map replaced with an
+ * equivalently-lengthed string of '#'.
+ * @param m The map that a value will be redacted from
+ * @param key The key pointing to the value to be redacted
+ * @return a new map with the value redacted. The original map will
not be modified.
+ */
+ public static Map<Object, String> redactValue(Map<Object, String> m,
Object key) {
+ if(m.containsKey(key)) {
+ HashMap<Object, String> newMap = new HashMap<>(m);
+ String value = newMap.get(key);
+ String redacted = new String(new
char[value.length()]).replace("\0", "#");
+ newMap.put(key, redacted);
+ return newMap;
+ }
+ return m;
+ }
+
+ /**
+ * Make sure a given key name is valid for the storm config.
+ * Throw RuntimeException if the key isn't valid.
+ * @param name The name of the config key to check.
+ */
+ private static final Set<String> disallowedKeys = new
HashSet<>(Arrays.asList(new String[] {"/", ".", ":", "\\"}));
+ public static void validateKeyName(String name) {
+
+ for(String key : disallowedKeys) {
+ if( name.contains(key) ) {
+ throw new RuntimeException("Key name cannot contain any of
the following: " + disallowedKeys.toString());
+ }
+ }
+ if(name.trim().isEmpty()) {
+ throw new RuntimeException("Key name cannot be blank");
+ }
+ }
+
+ /**
+ * Find the first item of coll for which pred.test(...) returns true.
+ * @param pred The IPredicate to test for
+ * @param coll The Collection of items to search through.
+ * @return The first matching value in coll, or null if nothing
matches.
+ */
+ public static <T> T findFirst (IPredicate<T> pred, Collection<T> coll)
{
+ if(coll == null) {
+ return null;
+ }
+ for(T elem : coll) {
+ if (pred.test(elem)) {
+ return elem;
+ }
+ }
+ return null;
+ }
+
+ public static <T, U> T findFirst (IPredicate<T> pred, Map<U, T> map) {
+ if(map == null) {
+ return null;
+ }
+ return findFirst(pred, (Set<T>)map.entrySet());
+ }
+
+ public static String localHostname () throws UnknownHostException {
+ return _instance.localHostnameImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected String localHostnameImpl () throws UnknownHostException {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ }
+
+ private static String memoizedLocalHostnameString = null;
+
+ public static String memoizedLocalHostname () throws
UnknownHostException {
+ if (memoizedLocalHostnameString == null) {
+ memoizedLocalHostnameString = localHostname();
+ }
+ return memoizedLocalHostnameString;
+ }
+
+ /**
+ * Gets the storm.local.hostname value, or tries to figure out the
local hostname
+ * if it is not set in the config.
+ * @param conf The storm config to read from
+ * @return a string representation of the hostname.
+ */
+ public static String hostname (Map<String, Object> conf) throws
UnknownHostException {
+ if (conf == null) {
+ return memoizedLocalHostname();
+ }
+ Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+ if (hostnameString == null || hostnameString.equals("")) {
+ return memoizedLocalHostname();
+ }
+ return (String)hostnameString;
+ }
+
+ public static String uuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ public static void exitProcess (int val, Object... msg) {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append("Halting process: ");
+ for (Object oneMessage: msg) {
+ errorMessage.append(oneMessage);
+ }
+ String combinedErrorMessage = errorMessage.toString();
+ LOG.error(combinedErrorMessage, new
RuntimeException(combinedErrorMessage));
+ Runtime.getRuntime().exit(val);
+ }
+
+ /**
+ * "{:a 1 :b 2} -> {1 :a 2 :b}"
+ *
+ * Note: Only one key wins if there are duplicate values.
+ * Which key wins is indeterminate:
+ * "{:a 1 :b 1} -> {1 :a} *or* {1 :b}"
+ */
+ public static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
+ Map<V, K> ret = new HashMap<V, K>();
+ for (Map.Entry<K, V> entry : map.entrySet()) {
+ ret.put(entry.getValue(), entry.getKey());
+ }
+ return ret;
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ * Example usage in java:
+ * Map<Integer, String> tasks;
+ * Map<String, List<Integer>> componentTasks =
Utils.reverse_map(tasks);
+ *
+ * @param map
+ * @return
+ */
+ public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+ HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+ if (map == null) {
+ return rtn;
+ }
+ for (Entry<K, V> entry : map.entrySet()) {
+ K key = entry.getKey();
+ V val = entry.getValue();
+ List<K> list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<K>();
+ rtn.put(entry.getValue(), list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ */
+ public static HashMap reverseMap(List listSeq) {
+ HashMap<Object, List<Object>> rtn = new HashMap();
+ if (listSeq == null) {
+ return rtn;
+ }
+ for (Object entry : listSeq) {
+ List listEntry = (List) entry;
+ Object key = listEntry.get(0);
+ Object val = listEntry.get(1);
+ List list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<Object>();
+ rtn.put(val, list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+
+ /**
+ * @return the pid of this JVM, because Java doesn't provide a real
way to do this.
+ */
+ public static String processPid() {
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ String[] split = name.split("@");
+ if (split.length != 2) {
+ throw new RuntimeException("Got unexpected process name: " +
name);
+ }
+ return split[0];
+ }
+
+ public static int execCommand(String... command) throws
ExecuteException, IOException {
+ //String[] cmdlist = command.split(" ");
+ CommandLine cmd = new CommandLine(command[0]);
+ for (int i = 1; i < command.length; i++) {
+ cmd.addArgument(command[i]);
+ }
+
+ DefaultExecutor exec = new DefaultExecutor();
+ return exec.execute(cmd);
+ }
+
+ /**
+ * Extra dir from the jar to destdir
+ *
+ * @param jarpath Path to the jar file
+ * @param dir Directory in the jar to pull out
+ * @param destdir Path to the directory where the extracted directory
will be put
+ *
+ */
+ public static void extractDirFromJar(String jarpath, String dir,
String destdir) {
+ try (JarFile jarFile = new JarFile(jarpath)) {
+ Enumeration<JarEntry> jarEnums = jarFile.entries();
+ while (jarEnums.hasMoreElements()) {
+ JarEntry entry = jarEnums.nextElement();
+ if (!entry.isDirectory() &&
entry.getName().startsWith(dir)) {
+ File aFile = new File(destdir, entry.getName());
+ aFile.getParentFile().mkdirs();
+ try (FileOutputStream out = new
FileOutputStream(aFile);
+ InputStream in = jarFile.getInputStream(entry)) {
+ IOUtils.copy(in, out);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.info("Could not extract {} from {}", dir, jarpath);
+ }
+ }
+
+ public static void sendSignalToProcess(long lpid, int signum) throws
IOException {
+ String pid = Long.toString(lpid);
+ try {
+ if (isOnWindows()) {
+ if (signum == SIGKILL) {
+ execCommand("taskkill", "/f", "/pid", pid);
+ } else {
+ execCommand("taskkill", "/pid", pid);
+ }
+ } else {
+ execCommand("kill", "-" + signum, pid);
+ }
+ } catch (ExecuteException e) {
+ LOG.info("Error when trying to kill " + pid + ". Process is
probably already dead.");
+ } catch (IOException e) {
+ LOG.info("IOException Error when trying to kill " + pid + ".");
+ throw e;
+ }
+ }
+
+ public static void forceKillProcess (String pid) throws IOException {
+ sendSignalToProcess(Long.parseLong(pid), SIGKILL);
+ }
+
+ public static void killProcessWithSigTerm (String pid) throws
IOException {
+ sendSignalToProcess(Long.parseLong(pid), SIGTERM);
+ }
+
+ /**
+ * Adds the user supplied function as a shutdown hook for cleanup.
+ * Also adds a function that sleeps for a second and then sends kill -9
--- End diff --
It doesn't send kill -9 signal.
> Port backtype.storm.util to java
> --------------------------------
>
> Key: STORM-1226
> URL: https://issues.apache.org/jira/browse/STORM-1226
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Reza Farivar
> Labels: java-migration, jstorm-merger
>
> Port backtype.storm.util from clojure to java. In as many instances as
> possible the same interface should be maintained, and calls to clojure
> functions in the rest of the code should be replaces with calls to the
> corresponding java code.
> Some similar functions can be found at
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
> Although they are not identical.
> For function callbacks we may need to evaluate adding in appropriate callback
> interfaces instead. Please try to avoid using clojure internal java classes
> unless necessary.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)