[
https://issues.apache.org/jira/browse/STORM-1226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15139394#comment-15139394
]
ASF GitHub Bot commented on STORM-1226:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1074#discussion_r52354678
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1370,9 +1469,974 @@ public static int toPositive(int number) {
public static RuntimeException wrapInRuntime(Exception e){
if (e instanceof RuntimeException){
return (RuntimeException)e;
- }else {
+ } else {
return new RuntimeException(e);
}
}
-}
+ /**
+ * Determines if a zip archive contains a particular directory.
+ *
+ * @param zipfile path to the zipped file
+ * @param target directory being looked for in the zip.
+ * @return boolean whether or not the directory exists in the zip.
+ */
+ public static boolean zipDoesContainDir(String zipfile, String target)
throws IOException {
+ List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new
ZipFile(zipfile).entries());
+
+ if(entries == null) {
+ return false;
+ }
+
+ String targetDir = target + "/";
+ for(ZipEntry entry : entries) {
+ String name = entry.getName();
+ if(name.startsWith(targetDir)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Joins any number of maps together into a single map, combining
their values into
+ * a list, maintaining values in the order the maps were passed in.
Nulls are inserted
+ * for given keys when the map does not contain that key.
+ *
+ * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' =>
5}) ->
+ * {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null,
5]}
+ *
+ * @param maps variable number of maps to join - order affects order
of values in output.
+ * @return combined map
+ */
+ public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
+ Map<K, List<V>> ret = new HashMap<>();
+
+ Set<K> keys = new HashSet<>();
+
+ for(Map<K, V> map : maps) {
+ keys.addAll(map.keySet());
+ }
+
+ for(Map<K, V> m : maps) {
+ for(K key : keys) {
+ V value = m.get(key);
+
+ if(!ret.containsKey(key)) {
+ ret.put(key, new ArrayList<V>());
+ }
+
+ List<V> targetList = ret.get(key);
+ targetList.add(value);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Fills up chunks out of a collection (given a maximum amount of
chunks)
+ *
+ * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
+ * partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
+ * partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
+ * @param maxNumChunks the maximum number of chunks to return
+ * @param coll the collection to be chunked up
+ * @return a list of the chunks, which are themselves lists.
+ */
+ public static <T> List<List<T>> partitionFixed(int maxNumChunks,
Collection<T> coll) {
+ List<List<T>> ret = new ArrayList<>();
+
+ if(maxNumChunks == 0 || coll == null) {
+ return ret;
+ }
+
+ Map<Integer, Integer> parts = integerDivided(coll.size(),
maxNumChunks);
+
+ // Keys sorted in descending order
+ List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
+ Collections.sort(sortedKeys, Collections.reverseOrder());
+
+
+ Iterator<T> it = coll.iterator();
+ for(Integer chunkSize : sortedKeys) {
+ if(!it.hasNext()) { break; }
+ Integer times = parts.get(chunkSize);
+ for(int i = 0; i < times; i++) {
+ if(!it.hasNext()) { break; }
+ List<T> chunkList = new ArrayList<>();
+ for(int j = 0; j < chunkSize; j++) {
+ if(!it.hasNext()) { break; }
+ chunkList.add(it.next());
+ }
+ ret.add(chunkList);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Return a new instance of a pluggable specified in the conf.
+ * @param conf The conf to read from.
+ * @param configKey The key pointing to the pluggable class
+ * @return an instance of the class or null if it is not specified.
+ */
+ public static Object getConfiguredClass(Map conf, Object configKey) {
+ if (conf.containsKey(configKey)) {
+ return newInstance((String)conf.get(configKey));
+ }
+ return null;
+ }
+
+ public static String logsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) +
FILE_PATH_SEPARATOR + "worker.log";
+ }
+
+ public static String eventLogsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) +
FILE_PATH_SEPARATOR + "events.log";
+ }
+
+ public static Object readYamlFile(String yamlFile) {
+ try (FileReader reader = new FileReader(yamlFile)) {
+ return new Yaml(new SafeConstructor()).load(reader);
+ }
+ catch(Exception ex) {
+ LOG.error("Failed to read yaml file.", ex);
+ }
+ return null;
+ }
+
+ public static void setupDefaultUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread thread, Throwable
thrown) {
+ try {
+ handleUncaughtException(thrown);
+ }
+ catch (Error err) {
+ LOG.error("Received error in main thread..
terminating server...", err);
+ Runtime.getRuntime().exit(-2);
+ }
+ }
+ });
+ }
+
+ /**
+ * Creates a new map with a string value in the map replaced with an
+ * equivalently-lengthed string of '#'.
+ * @param m The map that a value will be redacted from
+ * @param key The key pointing to the value to be redacted
+ * @return a new map with the value redacted. The original map will
not be modified.
+ */
+ public static Map redactValue(Map<Object, String> m, Object key) {
+ if(m.containsKey(key)) {
+ HashMap<Object, String> newMap = new HashMap<>(m);
+ String value = newMap.get(key);
+ String redacted = new String(new
char[value.length()]).replace("\0", "#");
+ newMap.put(key, redacted);
+ return newMap;
+ }
+ return m;
+ }
+
+ public static void logThriftAccess(Integer requestId, InetAddress
remoteAddress, Principal principal, String operation) {
+ new ThriftAccessLogger().log(
+ String.format("Request ID: {} access from: {} principal: {}
operation: {}",
+ requestId, remoteAddress, principal, operation));
+ }
+
+ /**
+ * Make sure a given key name is valid for the storm config.
+ * Throw RuntimeException if the key isn't valid.
+ * @param name The name of the config key to check.
+ */
+ public static void validateKeyName(String name) {
+ Set<String> disallowedKeys = new HashSet<>();
+ disallowedKeys.add("/");
+ disallowedKeys.add(".");
+ disallowedKeys.add(":");
+ disallowedKeys.add("\\");
+
+ for(String key : disallowedKeys) {
+ if( name.contains(key) ) {
+ throw new RuntimeException("Key name cannot contain any of
the following: " + disallowedKeys.toString());
+ }
+ }
+ if(name.trim().isEmpty()) {
+ throw new RuntimeException("Key name cannot be blank");
+ }
+ }
+
+ //Everything from here on is translated from the old util.clj
(storm-core/src/clj/backtype.storm/util.clj)
+
+ public static final boolean IS_ON_WINDOWS =
"Windows_NT".equals(System.getenv("OS"));
+
+ public static final String FILE_PATH_SEPARATOR =
System.getProperty("file.separator");
+
+ public static final String CLASS_PATH_SEPARATOR =
System.getProperty("path.separator");
+
+ public static final int SIGKILL = 9;
+ public static final int SIGTERM = 15;
+
+
+
+ /**
+ * Find the first item of coll for which pred.test(...) returns true.
+ * @param pred The IPredicate to test for
+ * @param coll The Collection of items to search through.
+ * @return The first matching value in coll, or null if nothing
matches.
+ */
+ public static Object findFirst (IPredicate pred, Collection coll) {
+ if (coll == null || pred == null) {
+ return null;
+ } else {
+ Iterator<Object> iter = coll.iterator();
+ while(iter != null && iter.hasNext()) {
+ Object obj = iter.next();
+ if (pred.test(obj)) {
+ return obj;
+ }
+ }
+ return null;
+ }
+ }
+
+ public static Object findFirst (IPredicate pred, Map map) {
+ if (map == null || pred == null) {
+ return null;
+ } else {
+ Iterator<Object> iter = map.entrySet().iterator();
+ while(iter != null && iter.hasNext()) {
+ Object obj = iter.next();
+ if (pred.test(obj)) {
+ return obj;
+ }
+ }
+ return null;
+ }
+ }
+
+ public static String localHostname () throws UnknownHostException {
+ return _instance.localHostnameImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected String localHostnameImpl () throws UnknownHostException {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ }
+
+ private static String memoizedLocalHostnameString = null;
+
+ public static String memoizedLocalHostname () throws
UnknownHostException {
+ if (memoizedLocalHostnameString == null) {
+ memoizedLocalHostnameString = localHostname();
+ }
+ return memoizedLocalHostnameString;
+ }
+
+ /**
+ * Gets the storm.local.hostname value, or tries to figure out the
local hostname
+ * if it is not set in the config.
+ * @param conf The storm config to read from
+ * @return a string representation of the hostname.
+ */
+ public static String hostname (Map<String, Object> conf) throws
UnknownHostException {
+ if (conf == null) {
+ return memoizedLocalHostname();
+ }
+ Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+ if (hostnameString == null ) {
+ return memoizedLocalHostname();
+ }
+ if (hostnameString.equals("")) {
+ return memoizedLocalHostname();
+ }
+ return hostnameString.toString();
+ }
+
+ public static String uuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ public static long secsToMillisLong(double secs) {
+ return (long) (1000 * secs);
+ }
+
+ public static Vector<String> tokenizePath (String path) {
+ String[] tokens = path.split("/");
+ Vector<String> outputs = new Vector<String>();
+ if (tokens == null || tokens.length == 0) {
+ return null;
+ }
+ for (String tok: tokens) {
+ if (!tok.isEmpty()) {
+ outputs.add(tok);
+ }
+ }
+ return outputs;
+ }
+
+ public static String parentPath(String path) {
+ if (path == null) {
+ return "/";
+ }
+ Vector<String> tokens = tokenizePath(path);
+ int length = tokens.size();
+ if (length == 0) {
+ return "/";
+ }
+ String output = "";
+ for (int i = 0; i < length - 1; i++) { //length - 1 to mimic
"butlast" from the old clojure code
+ output = output + "/" + tokens.get(i);
+ }
+ return output;
+ }
+
+ public static String toksToPath (Vector<String> toks) {
+ if (toks == null || toks.size() == 0) {
+ return "/";
+ }
+
+ String output = "";
+ for (int i = 0; i < toks.size(); i++) {
+ output = output + "/" + toks.get(i);
+ }
+ return output;
+ }
+ public static String normalizePath (String path) {
+ return toksToPath(tokenizePath(path));
+ }
+
+ public static void exitProcess (int val, Object... msg) {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append("halting process: ");
+ for (Object oneMessage: msg) {
+ errorMessage.append(oneMessage);
+ }
+ String combinedErrorMessage = errorMessage.toString();
+ LOG.error(combinedErrorMessage, new
RuntimeException(combinedErrorMessage));
+ Runtime.getRuntime().exit(val);
+ }
+
+ public static Object defaulted(Object val, Object defaultObj) {
+ if (val != null) {
+ return val;
+ } else {
+ return defaultObj;
+ }
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ * Example usage in java:
+ * Map<Integer, String> tasks;
+ * Map<String, List<Integer>> componentTasks =
Utils.reverse_map(tasks);
+ *
+ * @param map
+ * @return
+ */
+ public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+ HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+ if (map == null) {
+ return rtn;
+ }
+ for (Entry<K, V> entry : map.entrySet()) {
+ K key = entry.getKey();
+ V val = entry.getValue();
+ List<K> list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<K>();
+ rtn.put(entry.getValue(), list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ */
+ public static HashMap reverseMap(List listSeq) {
+ HashMap<Object, List<Object>> rtn = new HashMap();
+ if (listSeq == null) {
+ return rtn;
+ }
+ for (Object entry : listSeq) {
+ List listEntry = (List) entry;
+ Object key = listEntry.get(0);
+ Object val = listEntry.get(1);
+ List list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<Object>();
+ rtn.put(val, list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+
+ /**
+ * Gets the pid of this JVM, because Java doesn't provide a real way
to do this.
+ *
+ * @return
+ */
+ public static String processPid() throws RuntimeException {
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ String[] split = name.split("@");
+ if (split.length != 2) {
+ throw new RuntimeException("Got unexpected process name: " +
name);
+ }
+ return split[0];
+ }
+
+ public static int execCommand(String command) throws ExecuteException,
IOException {
+ String[] cmdlist = command.split(" ");
+ CommandLine cmd = new CommandLine(cmdlist[0]);
+ for (int i = 1; i < cmdlist.length; i++) {
+ cmd.addArgument(cmdlist[i]);
+ }
+
+ DefaultExecutor exec = new DefaultExecutor();
+ return exec.execute(cmd);
+ }
+
+ /**
+ * Extra dir from the jar to destdir
+ *
+ * @param jarpath
+ * @param dir
+ * @param destdir
+ *
+ (with-open [jarpath (ZipFile. jarpath)]
+ (let [entries (enumeration-seq (.entries jarpath))]
+ (doseq [file (filter (fn [entry](and (not (.isDirectory entry))
(.startsWith (.getName entry) dir))) entries)]
+ (.mkdirs (.getParentFile (File. destdir (.getName file))))
+ (with-open [out (FileOutputStream. (File. destdir (.getName file)))]
+ (io/copy (.getInputStream jarpath file) out)))))
+
+ */
+ public static void extractDirFromJar(String jarpath, String dir,
String destdir) {
+ JarFile jarFile = null;
+ FileOutputStream out = null;
+ InputStream in = null;
+ try {
+ jarFile = new JarFile(jarpath);
+ Enumeration<JarEntry> jarEnums = jarFile.entries();
+ while (jarEnums.hasMoreElements()) {
+ JarEntry entry = jarEnums.nextElement();
+ if (!entry.isDirectory() &&
entry.getName().startsWith(dir)) {
+ File aFile = new File(destdir, entry.getName());
+ aFile.getParentFile().mkdirs();
+ out = new FileOutputStream(aFile);
+ in = jarFile.getInputStream(entry);
+ IOUtils.copy(in, out);
+ out.close();
+ in.close();
+ }
+ }
+ } catch (IOException e) {
+ LOG.info("Could not extract {} from {}", dir, jarpath);
+ } finally {
+ if (jarFile != null) {
+ try {
+ jarFile.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Something really strange happened when trying
to close the jar file" + jarpath);
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Something really strange happened when trying
to close the output for jar file" + jarpath);
+ }
+ }
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Something really strange happened when trying
to close the input for jar file" + jarpath);
+ }
+ }
+ }
+
+ }
+
+ public static int sendSignalToProcess(long pid, int signum) {
+ int retval = 0;
+ try {
+ String killString = null;
+ if (isOnWindows()) {
+ if (signum == SIGKILL) {
+ killString = "taskkill /f /pid ";
+ } else {
+ killString = "taskkill /pid ";
+ }
+ } else {
+ killString = "kill -" + signum + " ";
+ }
+ killString = killString + pid;
+ retval = execCommand(killString);
+ } catch (ExecuteException e) {
+ LOG.info("Error when trying to kill " + pid + ". Process is
probably already dead.");
+ } catch (IOException e) {
+ LOG.info("IOException Error when trying to kill " + pid + ".");
+ } finally {
+ return retval;
+ }
+ }
+
+ public static int forceKillProcess (long pid) {
+ return sendSignalToProcess(pid, SIGKILL);
+ }
+
+ public static int forceKillProcess (String pid) {
+ return sendSignalToProcess(Long.parseLong(pid), SIGKILL);
+ }
+
+ public static int killProcessWithSigTerm (long pid) {
+ return sendSignalToProcess(pid, SIGTERM);
+ }
+ public static int killProcessWithSigTerm (String pid) {
+ return sendSignalToProcess(Long.parseLong(pid), SIGTERM);
+ }
+
+ /*
+ Adds the user supplied function as a shutdown hook for cleanup.
+ Also adds a function that sleeps for a second and then sends kill
-9
+ to process to avoid any zombie process in case cleanup function
hangs.
+ */
+ public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
+ Runnable sleepKill = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Time.sleepSecs(1);
+ Runtime.getRuntime().halt(20);
+ } catch (Exception e) {
+ LOG.warn("Exception in the ShutDownHook: " + e);
+ }
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(new Thread(func));
+ Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
+ }
+
+ /**
+ * Returns the combined string, escaped for posix shell.
+ * @param command the list of strings to be combined
+ * @return the resulting command string
+ */
+ public static String shellCmd (List<String> command) {
+ List<String> changedCommands = new LinkedList<>();
+ for (String str: command) {
+ if (str == null) {
+ continue;
+ }
+ changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") +
"'");
+ }
+ return StringUtils.join(changedCommands, " ");
+ }
+
+ public static String scriptFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
+ }
+
+ public static String containerFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
+ }
+
+ public static void throwRuntime (Object... strings) {
+ String combinedErrorMessage = "";
+ for (Object oneMessage: strings) {
+ combinedErrorMessage = combinedErrorMessage +
oneMessage.toString();
+ }
+ throw new RuntimeException(combinedErrorMessage);
+ }
+
+ public static Object nullToZero (Object v) {
+ return (v!=null? v : 0);
+ }
+
+ public static Object containerGet (Container container) {
+ return container.object;
+ }
+
+ public static Container containerSet (Container container, Object obj)
{
+ container.object = obj;
+ return container;
+ }
+
+
+
+ /**
+ * Deletes a file or directory and its contents if it exists. Does not
+ * complain if the input is null or does not exist.
+ * @param path the path to the file or directory
+ */
+ public static void forceDelete(String path) throws IOException {
+ _instance.forceDeleteImpl(path);
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected void forceDeleteImpl(String path) throws IOException {
+ LOG.debug("Deleting path {}", path);
+ if (checkFileExists(path)) {
+ try {
+ FileUtils.forceDelete(new File(path));
+ } catch (FileNotFoundException ignored) {}
+ }
+ }
+
+ /**
+ * Creates a symbolic link to the target
+ * @param dir the parent directory of the link
+ * @param targetDir the parent directory of the link's target
+ * @param targetFilename the file name of the links target
+ * @param filename the file name of the link
+ * @return the path of the link if it did not exist, otherwise null
+ * @throws IOException
+ */
+ public static Path createSymlink(String dir, String targetDir,
+ String targetFilename, String filename) throws IOException {
+ Path path = Paths.get(dir, filename).toAbsolutePath();
+ Path target = Paths.get(targetDir,
targetFilename).toAbsolutePath();
+ LOG.debug("Creating symlink [{}] to [{}]", path, target);
+ if (!path.toFile().exists()) {
+ return Files.createSymbolicLink(path, target);
+ }
+ return null;
+ }
+
+ /**
+ * Convenience method for the case when the link's file name should be
the
+ * same as the file name of the target
+ */
+ public static Path createSymlink(String dir, String targetDir,
+ String targetFilename) throws
IOException {
+ return Utils.createSymlink(dir, targetDir, targetFilename,
+ targetFilename);
+ }
+
+ /**
+ * Returns a Collection of file names found under the given directory.
+ * @param dir a directory
+ * @return the Collection of file names
+ */
+ public static Collection<String> readDirContents(String dir) {
+ Collection<String> ret = new HashSet<>();
+ File[] files = new File(dir).listFiles();
+ if (files != null) {
+ for (File f: files) {
+ ret.add(f.getName());
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Returns the value of java.class.path System property. Kept separate
for
+ * testing.
+ * @return the classpath
+ */
+ public static String currentClasspath() {
+ return _instance.currentClasspathImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ public String currentClasspathImpl() {
+ return System.getProperty("java.class.path");
+ }
+
+ /**
+ * Returns a collection of jar file names found under the given
directory.
+ * @param dir the directory to search
+ * @return the jar file names
+ */
+ private static List<String> getFullJars(String dir) {
+ File[] files = new File(dir).listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith(".jar");
+ }
+ });
+
+ if(files == null) {
+ return new ArrayList<>();
+ }
+
+ List<String> ret = new ArrayList<>(files.length);
+ for (File f : files) {
+ ret.add(Paths.get(dir, f.getName()).toString());
+ }
+ return ret;
+ }
+
+ public static String workerClasspath() {
+ String stormDir = System.getProperty("storm.home");
+ String stormLibDir = Paths.get(stormDir, "lib").toString();
+ String stormConfDir =
+ System.getenv("STORM_CONF_DIR") != null ?
+ System.getenv("STORM_CONF_DIR") :
+ Paths.get(stormDir, "conf").toString();
+ String stormExtlibDir = Paths.get(stormDir, "extlib").toString();
+ String extcp = System.getenv("STORM_EXT_CLASSPATH");
+ if (stormDir == null) {
+ return Utils.currentClasspath();
+ }
+ List<String> pathElements = new LinkedList<>();
+ pathElements.addAll(Utils.getFullJars(stormLibDir));
+ pathElements.addAll(Utils.getFullJars(stormExtlibDir));
+ pathElements.add(extcp);
+ pathElements.add(stormConfDir);
+
+ return StringUtils.join(pathElements,
+ CLASS_PATH_SEPARATOR);
+ }
+
+ public static String addToClasspath(String classpath,
+ Collection<String> paths) {
+ return _instance.addToClasspathImpl(classpath, paths);
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ public String addToClasspathImpl(String classpath,
+ Collection<String> paths) {
+ if (paths == null || paths.isEmpty()) {
+ return classpath;
+ }
+ List<String> l = new LinkedList<>();
+ l.add(classpath);
+ l.addAll(paths);
+ return StringUtils.join(l, CLASS_PATH_SEPARATOR);
+ }
+
+ public static class UptimeComputer {
+ int startTime = 0;
+
+ public UptimeComputer() {
+ startTime = Time.currentTimeSecs();
+ }
+
+ public int upTime() {
+ return Time.delta(startTime);
+ }
+ }
+
+ public static UptimeComputer makeUptimeComputer() {
+ return _instance.makeUptimeComputerImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ public UptimeComputer makeUptimeComputerImpl() {
+ return new UptimeComputer();
+ }
+
+ /**
+ * Writes a posix shell script file to be executed in its own process.
+ * @param dir the directory under which the script is to be written
+ * @param command the command the script is to execute
+ * @param environment optional environment variables to set before
running the script's command. May be null.
+ * @return the path to the script that has been written
+ */
+ public static String writeScript(String dir, List<String> command,
+ Map<String,String> environment) {
+ String path = Utils.scriptFilePath(dir);
+ try(BufferedWriter out = new BufferedWriter(new FileWriter(path)))
{
+ out.write("#!/bin/bash");
+ out.newLine();
+ if (environment != null) {
+ for (String k : environment.keySet()) {
+ String v = environment.get(k);
+ if (v == null) {
+ v = "";
+ }
+ out.write(Utils.shellCmd(
+ Arrays.asList(
+ "export",k+"="+v)));
+ out.write(";");
+ out.newLine();
+ }
+ }
+ out.newLine();
+ out.write("exec "+Utils.shellCmd(command)+";");
+ } catch (IOException io) {
+ throw new RuntimeException("Could not write posix script
file", io);
+ }
+ return path;
+ }
+
+ /**
+ * A thread that can answer if it is sleeping in the case of simulated
time.
+ * This class is not useful when simulated time is not being used.
+ */
+ public static class SmartThread extends Thread {
+ public boolean isSleeping() {
+ return Time.isThreadWaiting(this);
+ }
+ public SmartThread(Runnable r) {
+ super(r);
+ }
+ }
+
+ /**
+ * Creates a thread that calls the given code repeatedly, sleeping for
an
+ * interval of seconds equal to the return value of the previous call.
+ *
+ * The given afn may be a callable that returns the number of seconds
to
+ * sleep, or it may be a Callable that returns another Callable that
in turn
+ * returns the number of seconds to sleep. In the latter case
isFactory.
+ *
+ * @param afn the code to call on each iteration
+ * @param isDaemon whether the new thread should be a daemon thread
+ * @param eh code to call when afn throws an exception
+ * @param priority the new thread's priority see
+ * @param isFactory whether afn returns a callable instead of sleep
seconds
+ * @param startImmediately whether to start the thread before returning
+ * @param threadName a suffix to be appended to the thread name
+ * @return the newly created thread
+ * @see java.lang.Thread
+ */
+ public static SmartThread asyncLoop(final Callable afn,
+ boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
+ int priority, final boolean isFactory, boolean
startImmediately,
+ String threadName) {
+ SmartThread thread = new SmartThread(new Runnable() {
+ public void run() {
+ Object s;
+ try {
+ Callable fn = isFactory ? (Callable) afn.call() : afn;
+ while ((s = fn.call()) instanceof Long) {
+ Time.sleepSecs((Long) s);
+ }
+ } catch (Throwable t) {
+ if (Utils.exceptionCauseIsInstanceOf(
+ InterruptedException.class, t)) {
+ LOG.info("Async loop interrupted!");
+ return;
+ }
+ LOG.error("Async loop died!", t);
+ throw new RuntimeException(t);
+ }
+ }
+ });
+ if (eh != null) {
+ thread.setUncaughtExceptionHandler(eh);
+ } else {
+ thread.setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ Utils.exitProcess(1, "Async loop died!");
+ }
+ });
+ }
--- End diff --
The uncaught exception handlers are not the problem. The code in them is
fine. The issue is that we are installing them on the thread, that because of
the java's Exception Checking a Runnable cannot throw arbitrary exceptions. As
such we are catching all Throwables and things that are not caused by
InterruptedException are wrapped in a RuntimeException before being rethrown.
This last part of wrapping the Throwable in a RuntimeException was not
happening before. This means that any code that was looking for a FooExcption
would now find a RuntimeExeption with Foo as the caused by. We usually are
fairly good about checking the caused by so we probably don't care, but then
again this is error code so it is not tested as well as the other code, so it
might cause an issue that we are not aware of.
> Port backtype.storm.util to java
> --------------------------------
>
> Key: STORM-1226
> URL: https://issues.apache.org/jira/browse/STORM-1226
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Reza Farivar
> Labels: java-migration, jstorm-merger
>
> Port backtype.storm.util from clojure to java. In as many instances as
> possible the same interface should be maintained, and calls to clojure
> functions in the rest of the code should be replaces with calls to the
> corresponding java code.
> Some similar functions can be found at
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
> Although they are not identical.
> For function callbacks we may need to evaluate adding in appropriate callback
> interfaces instead. Please try to avoid using clojure internal java classes
> unless necessary.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)