[
https://issues.apache.org/jira/browse/STORM-1226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15137808#comment-15137808
]
ASF GitHub Bot commented on STORM-1226:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1074#discussion_r52233202
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1370,9 +1469,974 @@ public static int toPositive(int number) {
public static RuntimeException wrapInRuntime(Exception e){
if (e instanceof RuntimeException){
return (RuntimeException)e;
- }else {
+ } else {
return new RuntimeException(e);
}
}
-}
+ /**
+ * Determines if a zip archive contains a particular directory.
+ *
+ * @param zipfile path to the zipped file
+ * @param target directory being looked for in the zip.
+ * @return boolean whether or not the directory exists in the zip.
+ */
+ public static boolean zipDoesContainDir(String zipfile, String target)
throws IOException {
+ List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new
ZipFile(zipfile).entries());
+
+ if(entries == null) {
+ return false;
+ }
+
+ String targetDir = target + "/";
+ for(ZipEntry entry : entries) {
+ String name = entry.getName();
+ if(name.startsWith(targetDir)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Joins any number of maps together into a single map, combining
their values into
+ * a list, maintaining values in the order the maps were passed in.
Nulls are inserted
+ * for given keys when the map does not contain that key.
+ *
+ * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' =>
5}) ->
+ * {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null,
5]}
+ *
+ * @param maps variable number of maps to join - order affects order
of values in output.
+ * @return combined map
+ */
+ public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
+ Map<K, List<V>> ret = new HashMap<>();
+
+ Set<K> keys = new HashSet<>();
+
+ for(Map<K, V> map : maps) {
+ keys.addAll(map.keySet());
+ }
+
+ for(Map<K, V> m : maps) {
+ for(K key : keys) {
+ V value = m.get(key);
+
+ if(!ret.containsKey(key)) {
+ ret.put(key, new ArrayList<V>());
+ }
+
+ List<V> targetList = ret.get(key);
+ targetList.add(value);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Fills up chunks out of a collection (given a maximum amount of
chunks)
+ *
+ * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
+ * partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
+ * partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
+ * @param maxNumChunks the maximum number of chunks to return
+ * @param coll the collection to be chunked up
+ * @return a list of the chunks, which are themselves lists.
+ */
+ public static <T> List<List<T>> partitionFixed(int maxNumChunks,
Collection<T> coll) {
+ List<List<T>> ret = new ArrayList<>();
+
+ if(maxNumChunks == 0 || coll == null) {
+ return ret;
+ }
+
+ Map<Integer, Integer> parts = integerDivided(coll.size(),
maxNumChunks);
+
+ // Keys sorted in descending order
+ List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
+ Collections.sort(sortedKeys, Collections.reverseOrder());
+
+
+ Iterator<T> it = coll.iterator();
+ for(Integer chunkSize : sortedKeys) {
+ if(!it.hasNext()) { break; }
+ Integer times = parts.get(chunkSize);
+ for(int i = 0; i < times; i++) {
+ if(!it.hasNext()) { break; }
+ List<T> chunkList = new ArrayList<>();
+ for(int j = 0; j < chunkSize; j++) {
+ if(!it.hasNext()) { break; }
+ chunkList.add(it.next());
+ }
+ ret.add(chunkList);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Return a new instance of a pluggable specified in the conf.
+ * @param conf The conf to read from.
+ * @param configKey The key pointing to the pluggable class
+ * @return an instance of the class or null if it is not specified.
+ */
+ public static Object getConfiguredClass(Map conf, Object configKey) {
+ if (conf.containsKey(configKey)) {
+ return newInstance((String)conf.get(configKey));
+ }
+ return null;
+ }
+
+ public static String logsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) +
FILE_PATH_SEPARATOR + "worker.log";
+ }
+
+ public static String eventLogsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) +
FILE_PATH_SEPARATOR + "events.log";
+ }
+
+ public static Object readYamlFile(String yamlFile) {
+ try (FileReader reader = new FileReader(yamlFile)) {
+ return new Yaml(new SafeConstructor()).load(reader);
+ }
+ catch(Exception ex) {
+ LOG.error("Failed to read yaml file.", ex);
+ }
+ return null;
+ }
+
+ public static void setupDefaultUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread thread, Throwable
thrown) {
+ try {
+ handleUncaughtException(thrown);
+ }
+ catch (Error err) {
+ LOG.error("Received error in main thread..
terminating server...", err);
+ Runtime.getRuntime().exit(-2);
+ }
+ }
+ });
+ }
+
+ /**
+ * Creates a new map with a string value in the map replaced with an
+ * equivalently-lengthed string of '#'.
+ * @param m The map that a value will be redacted from
+ * @param key The key pointing to the value to be redacted
+ * @return a new map with the value redacted. The original map will
not be modified.
+ */
+ public static Map redactValue(Map<Object, String> m, Object key) {
+ if(m.containsKey(key)) {
+ HashMap<Object, String> newMap = new HashMap<>(m);
+ String value = newMap.get(key);
+ String redacted = new String(new
char[value.length()]).replace("\0", "#");
+ newMap.put(key, redacted);
+ return newMap;
+ }
+ return m;
+ }
+
+ public static void logThriftAccess(Integer requestId, InetAddress
remoteAddress, Principal principal, String operation) {
+ new ThriftAccessLogger().log(
+ String.format("Request ID: {} access from: {} principal: {}
operation: {}",
+ requestId, remoteAddress, principal, operation));
+ }
+
+ /**
+ * Make sure a given key name is valid for the storm config.
+ * Throw RuntimeException if the key isn't valid.
+ * @param name The name of the config key to check.
+ */
+ public static void validateKeyName(String name) {
+ Set<String> disallowedKeys = new HashSet<>();
+ disallowedKeys.add("/");
+ disallowedKeys.add(".");
+ disallowedKeys.add(":");
+ disallowedKeys.add("\\");
+
+ for(String key : disallowedKeys) {
+ if( name.contains(key) ) {
+ throw new RuntimeException("Key name cannot contain any of
the following: " + disallowedKeys.toString());
+ }
+ }
+ if(name.trim().isEmpty()) {
+ throw new RuntimeException("Key name cannot be blank");
+ }
+ }
+
+ //Everything from here on is translated from the old util.clj
(storm-core/src/clj/backtype.storm/util.clj)
+
+ public static final boolean IS_ON_WINDOWS =
"Windows_NT".equals(System.getenv("OS"));
+
+ public static final String FILE_PATH_SEPARATOR =
System.getProperty("file.separator");
+
+ public static final String CLASS_PATH_SEPARATOR =
System.getProperty("path.separator");
+
+ public static final int SIGKILL = 9;
+ public static final int SIGTERM = 15;
+
+
+
+ /**
+ * Find the first item of coll for which pred.test(...) returns true.
+ * @param pred The IPredicate to test for
+ * @param coll The Collection of items to search through.
+ * @return The first matching value in coll, or null if nothing
matches.
+ */
+ public static Object findFirst (IPredicate pred, Collection coll) {
+ if (coll == null || pred == null) {
+ return null;
+ } else {
+ Iterator<Object> iter = coll.iterator();
+ while(iter != null && iter.hasNext()) {
+ Object obj = iter.next();
+ if (pred.test(obj)) {
+ return obj;
+ }
+ }
+ return null;
+ }
+ }
+
+ public static Object findFirst (IPredicate pred, Map map) {
+ if (map == null || pred == null) {
+ return null;
+ } else {
+ Iterator<Object> iter = map.entrySet().iterator();
+ while(iter != null && iter.hasNext()) {
+ Object obj = iter.next();
+ if (pred.test(obj)) {
+ return obj;
+ }
+ }
+ return null;
+ }
+ }
+
+ public static String localHostname () throws UnknownHostException {
+ return _instance.localHostnameImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected String localHostnameImpl () throws UnknownHostException {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ }
+
+ private static String memoizedLocalHostnameString = null;
+
+ public static String memoizedLocalHostname () throws
UnknownHostException {
+ if (memoizedLocalHostnameString == null) {
+ memoizedLocalHostnameString = localHostname();
+ }
+ return memoizedLocalHostnameString;
+ }
+
+ /**
+ * Gets the storm.local.hostname value, or tries to figure out the
local hostname
+ * if it is not set in the config.
+ * @param conf The storm config to read from
+ * @return a string representation of the hostname.
+ */
+ public static String hostname (Map<String, Object> conf) throws
UnknownHostException {
+ if (conf == null) {
+ return memoizedLocalHostname();
+ }
+ Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+ if (hostnameString == null ) {
+ return memoizedLocalHostname();
+ }
+ if (hostnameString.equals("")) {
+ return memoizedLocalHostname();
+ }
+ return hostnameString.toString();
+ }
+
+ public static String uuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ public static long secsToMillisLong(double secs) {
+ return (long) (1000 * secs);
+ }
+
+ public static Vector<String> tokenizePath (String path) {
+ String[] tokens = path.split("/");
+ Vector<String> outputs = new Vector<String>();
+ if (tokens == null || tokens.length == 0) {
+ return null;
+ }
+ for (String tok: tokens) {
+ if (!tok.isEmpty()) {
+ outputs.add(tok);
+ }
+ }
+ return outputs;
+ }
+
+ public static String parentPath(String path) {
+ if (path == null) {
+ return "/";
+ }
+ Vector<String> tokens = tokenizePath(path);
+ int length = tokens.size();
+ if (length == 0) {
+ return "/";
+ }
+ String output = "";
+ for (int i = 0; i < length - 1; i++) { //length - 1 to mimic
"butlast" from the old clojure code
+ output = output + "/" + tokens.get(i);
+ }
+ return output;
+ }
+
+ public static String toksToPath (Vector<String> toks) {
+ if (toks == null || toks.size() == 0) {
+ return "/";
+ }
+
+ String output = "";
+ for (int i = 0; i < toks.size(); i++) {
+ output = output + "/" + toks.get(i);
+ }
+ return output;
+ }
+ public static String normalizePath (String path) {
+ return toksToPath(tokenizePath(path));
+ }
+
+ public static void exitProcess (int val, Object... msg) {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append("halting process: ");
+ for (Object oneMessage: msg) {
+ errorMessage.append(oneMessage);
+ }
+ String combinedErrorMessage = errorMessage.toString();
+ LOG.error(combinedErrorMessage, new
RuntimeException(combinedErrorMessage));
+ Runtime.getRuntime().exit(val);
+ }
+
+ public static Object defaulted(Object val, Object defaultObj) {
+ if (val != null) {
+ return val;
+ } else {
+ return defaultObj;
+ }
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ * Example usage in java:
+ * Map<Integer, String> tasks;
+ * Map<String, List<Integer>> componentTasks =
Utils.reverse_map(tasks);
+ *
+ * @param map
+ * @return
+ */
+ public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+ HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+ if (map == null) {
+ return rtn;
+ }
+ for (Entry<K, V> entry : map.entrySet()) {
+ K key = entry.getKey();
+ V val = entry.getValue();
+ List<K> list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<K>();
+ rtn.put(entry.getValue(), list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ */
+ public static HashMap reverseMap(List listSeq) {
+ HashMap<Object, List<Object>> rtn = new HashMap();
+ if (listSeq == null) {
+ return rtn;
+ }
+ for (Object entry : listSeq) {
+ List listEntry = (List) entry;
+ Object key = listEntry.get(0);
+ Object val = listEntry.get(1);
+ List list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<Object>();
+ rtn.put(val, list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+
+ /**
+ * Gets the pid of this JVM, because Java doesn't provide a real way
to do this.
+ *
+ * @return
+ */
+ public static String processPid() throws RuntimeException {
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ String[] split = name.split("@");
+ if (split.length != 2) {
+ throw new RuntimeException("Got unexpected process name: " +
name);
+ }
+ return split[0];
+ }
+
+ public static int execCommand(String command) throws ExecuteException,
IOException {
+ String[] cmdlist = command.split(" ");
+ CommandLine cmd = new CommandLine(cmdlist[0]);
+ for (int i = 1; i < cmdlist.length; i++) {
+ cmd.addArgument(cmdlist[i]);
+ }
+
+ DefaultExecutor exec = new DefaultExecutor();
+ return exec.execute(cmd);
+ }
+
+ /**
+ * Extra dir from the jar to destdir
+ *
+ * @param jarpath
+ * @param dir
+ * @param destdir
+ *
+ (with-open [jarpath (ZipFile. jarpath)]
+ (let [entries (enumeration-seq (.entries jarpath))]
+ (doseq [file (filter (fn [entry](and (not (.isDirectory entry))
(.startsWith (.getName entry) dir))) entries)]
+ (.mkdirs (.getParentFile (File. destdir (.getName file))))
+ (with-open [out (FileOutputStream. (File. destdir (.getName file)))]
+ (io/copy (.getInputStream jarpath file) out)))))
+
+ */
+ public static void extractDirFromJar(String jarpath, String dir,
String destdir) {
+ JarFile jarFile = null;
+ FileOutputStream out = null;
+ InputStream in = null;
+ try {
+ jarFile = new JarFile(jarpath);
+ Enumeration<JarEntry> jarEnums = jarFile.entries();
+ while (jarEnums.hasMoreElements()) {
+ JarEntry entry = jarEnums.nextElement();
+ if (!entry.isDirectory() &&
entry.getName().startsWith(dir)) {
+ File aFile = new File(destdir, entry.getName());
+ aFile.getParentFile().mkdirs();
+ out = new FileOutputStream(aFile);
+ in = jarFile.getInputStream(entry);
+ IOUtils.copy(in, out);
+ out.close();
+ in.close();
+ }
+ }
+ } catch (IOException e) {
+ LOG.info("Could not extract {} from {}", dir, jarpath);
+ } finally {
+ if (jarFile != null) {
+ try {
+ jarFile.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Something really strange happened when trying
to close the jar file" + jarpath);
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Something really strange happened when trying
to close the output for jar file" + jarpath);
+ }
+ }
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Something really strange happened when trying
to close the input for jar file" + jarpath);
+ }
+ }
+ }
+
+ }
+
+ public static int sendSignalToProcess(long pid, int signum) {
+ int retval = 0;
+ try {
+ String killString = null;
+ if (isOnWindows()) {
+ if (signum == SIGKILL) {
+ killString = "taskkill /f /pid ";
+ } else {
+ killString = "taskkill /pid ";
+ }
+ } else {
+ killString = "kill -" + signum + " ";
+ }
+ killString = killString + pid;
+ retval = execCommand(killString);
+ } catch (ExecuteException e) {
+ LOG.info("Error when trying to kill " + pid + ". Process is
probably already dead.");
+ } catch (IOException e) {
+ LOG.info("IOException Error when trying to kill " + pid + ".");
+ } finally {
+ return retval;
+ }
+ }
+
+ public static int forceKillProcess (long pid) {
+ return sendSignalToProcess(pid, SIGKILL);
+ }
+
+ public static int forceKillProcess (String pid) {
+ return sendSignalToProcess(Long.parseLong(pid), SIGKILL);
+ }
+
+ public static int killProcessWithSigTerm (long pid) {
+ return sendSignalToProcess(pid, SIGTERM);
+ }
+ public static int killProcessWithSigTerm (String pid) {
+ return sendSignalToProcess(Long.parseLong(pid), SIGTERM);
+ }
+
+ /*
+ Adds the user supplied function as a shutdown hook for cleanup.
+ Also adds a function that sleeps for a second and then sends kill
-9
+ to process to avoid any zombie process in case cleanup function
hangs.
+ */
+ public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
+ Runnable sleepKill = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Time.sleepSecs(1);
+ Runtime.getRuntime().halt(20);
+ } catch (Exception e) {
+ LOG.warn("Exception in the ShutDownHook: " + e);
+ }
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(new Thread(func));
+ Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
+ }
+
+ /**
+ * Returns the combined string, escaped for posix shell.
+ * @param command the list of strings to be combined
+ * @return the resulting command string
+ */
+ public static String shellCmd (List<String> command) {
+ List<String> changedCommands = new LinkedList<>();
+ for (String str: command) {
+ if (str == null) {
+ continue;
+ }
+ changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") +
"'");
+ }
+ return StringUtils.join(changedCommands, " ");
+ }
+
+ public static String scriptFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
+ }
+
+ public static String containerFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
+ }
+
+ public static void throwRuntime (Object... strings) {
+ String combinedErrorMessage = "";
+ for (Object oneMessage: strings) {
+ combinedErrorMessage = combinedErrorMessage +
oneMessage.toString();
+ }
+ throw new RuntimeException(combinedErrorMessage);
+ }
+
+ public static Object nullToZero (Object v) {
+ return (v!=null? v : 0);
+ }
+
+ public static Object containerGet (Container container) {
+ return container.object;
+ }
+
+ public static Container containerSet (Container container, Object obj)
{
+ container.object = obj;
+ return container;
+ }
+
+
+
+ /**
+ * Deletes a file or directory and its contents if it exists. Does not
+ * complain if the input is null or does not exist.
+ * @param path the path to the file or directory
+ */
+ public static void forceDelete(String path) throws IOException {
+ _instance.forceDeleteImpl(path);
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected void forceDeleteImpl(String path) throws IOException {
+ LOG.debug("Deleting path {}", path);
+ if (checkFileExists(path)) {
+ try {
+ FileUtils.forceDelete(new File(path));
+ } catch (FileNotFoundException ignored) {}
+ }
+ }
+
+ /**
+ * Creates a symbolic link to the target
+ * @param dir the parent directory of the link
+ * @param targetDir the parent directory of the link's target
+ * @param targetFilename the file name of the links target
+ * @param filename the file name of the link
+ * @return the path of the link if it did not exist, otherwise null
+ * @throws IOException
+ */
+ public static Path createSymlink(String dir, String targetDir,
--- End diff --
The return value appears to be ignored everywhere it is used. For now it
is probably better to remove the return value.
> Port backtype.storm.util to java
> --------------------------------
>
> Key: STORM-1226
> URL: https://issues.apache.org/jira/browse/STORM-1226
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Reza Farivar
> Labels: java-migration, jstorm-merger
>
> Port backtype.storm.util from clojure to java. In as many instances as
> possible the same interface should be maintained, and calls to clojure
> functions in the rest of the code should be replaces with calls to the
> corresponding java code.
> Some similar functions can be found at
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
> Although they are not identical.
> For function callbacks we may need to evaluate adding in appropriate callback
> interfaces instead. Please try to avoid using clojure internal java classes
> unless necessary.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)