Github user hustfxj commented on a diff in the pull request:

    https://github.com/apache/storm/pull/662#discussion_r45188898
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/StormCommandExecutor.java 
---
    @@ -0,0 +1,868 @@
    +package backtype.storm.utils;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.InputStream;
    +import java.io.InputStreamReader;
    +import java.lang.reflect.InvocationTargetException;
    +import java.lang.reflect.Method;
    +import java.nio.charset.StandardCharsets;
    +import java.util.*;
    +
    +import clojure.lang.IFn;
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.commons.lang.SystemUtils;
    +
    +/**
    + * Created by pshah on 7/17/15.
    + */
    +abstract class StormCommandExecutor {
    +    final String NIMBUS_CLASS = "backtype.storm.daemon.nimbus";
    +    final String SUPERVISOR_CLASS = "backtype.storm.daemon.supervisor";
    +    final String UI_CLASS = "backtype.storm.ui.core";
    +    final String LOGVIEWER_CLASS = "backtype.storm.daemon.logviewer";
    +    final String DRPC_CLASS = "backtype.storm.daemon.drpc";
    +    final String REPL_CLASS = "clojure.main";
    +    final String ACTIVATE_CLASS = "backtype.storm.command.activate";
    +    final String DEACTIVATE_CLASS = "backtype.storm.command.deactivate";
    +    final String REBALANCE_CLASS = "backtype.storm.command.rebalance";
    +    final String LIST_CLASS = "backtype.storm.command.list";
    +    final String DEVZOOKEEPER_CLASS = 
"backtype.storm.command.dev_zookeeper";
    +    final String VERSION_CLASS = "backtype.storm.utils.VersionInfo";
    +    final String MONITOR_CLASS = "backtype.storm.command.monitor";
    +    final String UPLOADCREDENTIALS_CLASS = "backtype.storm.command" +
    +            ".upload_credentials";
    +    final String GETERRORS_CLASS = "backtype.storm.command.get_errors";
    +    final String SHELL_CLASS = "backtype.storm.command.shell_submission";
    +    String stormHomeDirectory;
    +    String userConfDirectory;
    +    String stormConfDirectory;
    +    String clusterConfDirectory;
    +    String stormLibDirectory;
    +    String stormBinDirectory;
    +    String stormLog4jConfDirectory;
    +    String configFile = "";
    +    String javaCommand;
    +    List<String> configOptions = new ArrayList<String>();
    +    String stormExternalClasspath;
    +    String stormExternalClasspathDaemon;
    +    String fileSeparator;
    +    final List<String> COMMANDS = Arrays.asList("jar", "kill", "shell",
    +            "nimbus", "ui", "logviewer", "drpc", "supervisor",
    +            "localconfvalue",  "remoteconfvalue", "repl", "classpath",
    +            "activate", "deactivate", "rebalance", "help",  "list",
    +            "dev-zookeeper", "version", "monitor", "upload-credentials",
    +            "get-errors");
    +
    +    public static void main (String[] args) {
    +        for (String arg : args) {
    +            System.out.println("Argument ++ is " + arg);
    +        }
    +        StormCommandExecutor stormCommandExecutor;
    +        if (System.getProperty("os.name").startsWith("Windows")) {
    +            stormCommandExecutor = new WindowsStormCommandExecutor();
    +        } else {
    +            stormCommandExecutor = new UnixStormCommandExecutor();
    +        }
    +        stormCommandExecutor.initialize();
    +        stormCommandExecutor.execute(args);
    +    }
    +
    +    StormCommandExecutor () {
    +
    +    }
    +
    +    abstract void initialize ();
    +
    +    abstract void execute (String[] args);
    +
    +    void callMethod (String command, List<String> args) {
    +        Class implementation = this.getClass();
    +        String methodName = command.replace("-", "") + "Command";
    +        try {
    +            Method method = implementation.getDeclaredMethod(methodName, 
List
    +                    .class);
    +            method.invoke(this, args);
    +        } catch (NoSuchMethodException ex) {
    +            System.out.println("No such method exception occured while 
trying" +
    +                    " to run storm method " + command);
    +        } catch (IllegalAccessException ex) {
    +            System.out.println("Illegal access exception occured while 
trying" +
    +                    " to run storm method " + command);
    +        } catch (IllegalArgumentException ex) {
    +            System.out.println("Illegal argument exception occured while " 
+
    +                    "trying" + " to run storm method " + command);
    +        } catch (InvocationTargetException ex) {
    +            System.out.println("Invocation target exception occured while 
" +
    +                    "trying" + " to run storm method " + command);
    +        }
    +    }
    +}
    +
    +class UnixStormCommandExecutor extends StormCommandExecutor {
    +
    +    UnixStormCommandExecutor () {
    +
    +    }
    +
    +    void initialize () {
    +        Collections.sort(this.COMMANDS);
    +        this.fileSeparator = System .getProperty ("file.separator");
    +        this.stormHomeDirectory = System.getenv("STORM_BASE_DIR");
    +        this.userConfDirectory = System.getProperty("user.home") +
    +                this.fileSeparator + "" +
    +                ".storm";
    +        this.stormConfDirectory = System.getenv("STORM_CONF_DIR");
    +        this.clusterConfDirectory = this.stormConfDirectory == null ?  
(this
    +                .stormHomeDirectory + this.fileSeparator + "conf") : this
    +                .stormConfDirectory;
    +        File f = new File(this.userConfDirectory + this.fileSeparator +
    +                "storm.yaml");
    +        if (!f.isFile()) {
    +            this.userConfDirectory = this.clusterConfDirectory;
    +        }
    +        this.stormLibDirectory = this.stormHomeDirectory + 
this.fileSeparator +
    +                "lib";
    +        this.stormBinDirectory = this.stormHomeDirectory + 
this.fileSeparator +
    +                "bin";
    +        this.stormLog4jConfDirectory = this.stormHomeDirectory +
    +                this.fileSeparator + "log4j2";
    +        if (System.getenv("JAVA_HOME") != null) {
    +            this.javaCommand = System.getenv("JAVA_HOME") + 
this.fileSeparator +
    +                    "bin" + this.fileSeparator + "java";
    +            if (!(new File(this.javaCommand).exists())) {
    +                System.out.println("ERROR:  JAVA_HOME is invalid.  Could 
not " +
    +                        "find " + this.javaCommand);
    +                System.exit(1);
    +            }
    +        } else {
    +            this.javaCommand = "java";
    +        }
    +        this.stormExternalClasspath = System.getenv("STORM_EXT_CLASSPATH");
    +        this.stormExternalClasspathDaemon = System.getenv
    +                ("STORM_EXT_CLASSPATH_DAEMON");
    +        if (!(new File(this.stormLibDirectory).exists())) {
    +            
System.out.println("******************************************");
    +            System.out.println("The storm client can only be run from 
within " +
    +                    "a release. " + "You appear to be trying to run the 
client" +
    +                    " from a checkout of Storm's source code.");
    +            System.out.println("You can download a Storm release at " +
    +                    "http://storm-project.net/downloads.html";);
    +            
System.out.println("******************************************");
    +            System.exit(1);
    +        }
    +        //System.getProperties().list(System.out);
    +    }
    +
    +    void execute (String[] args) {
    +        if (args.length == 0) {
    +            this.printUsage();
    +            System.exit(-1);
    +        }
    +        List<String> commandArgs = new ArrayList<String>();
    +        for (int i = 0; i < args.length; ++i) {
    +            if (args[i] == "-c") {
    +                this.configOptions.add(args[++i]);
    +            } else if (args[i] == "--config") {
    +                this.configFile = args[++i];
    +            } else {
    +                commandArgs.add(args[i]);
    +            }
    +        }
    +        if ((commandArgs.size() == 0)  || (!this.COMMANDS.contains
    +                (commandArgs.get(0)))) {
    +            System.out.println("Unknown command: [storm " + 
StringUtils.join
    +                    (args, " ") +  "]");
    +            this.printUsage();
    +            System.exit(254);
    +
    +        }
    +        this.callMethod(commandArgs.get(0), commandArgs.subList(1,
    +                commandArgs.size()));
    +
    +    }
    +
    +    String getConfigOptions() {
    +        String configOptions = "-Dstorm.options=";
    +        //TODO  - do urlencode here. python does quote_plus to each 
configoption
    +        return configOptions + StringUtils.join(this.configOptions, ',');
    +
    +    }
    +
    +    List<String> getJarsFull (String directory) {
    +        List<String> fullJarFiles = new ArrayList<String>();
    +        File file = new File(directory);
    +        File[] files = file.listFiles();
    +        if (files != null) {
    +            for (File f : files) {
    +                if (f.getName().endsWith(".jar")) {
    +                    fullJarFiles.add(f.getPath());
    +                }
    +            }
    +        }
    +        return fullJarFiles;
    +    }
    +
    +    String getClassPath (List<String> extraJars, boolean daemon) {
    +        List<String> classPaths = 
this.getJarsFull(this.stormHomeDirectory);
    +        classPaths.addAll(this.getJarsFull(this.stormLibDirectory));
    +        classPaths.addAll(this.getJarsFull(this.stormHomeDirectory + this
    +                .fileSeparator + "extlib"));
    +        if (daemon == true) {
    +            classPaths.addAll(this.getJarsFull(this.stormHomeDirectory + 
this
    +                    .fileSeparator + "extlib-daemon"));
    +        }
    +        if (this.stormExternalClasspath != null) {
    +            classPaths.add(this.stormExternalClasspath);
    +        }
    +        if (this.stormExternalClasspathDaemon != null) {
    +            classPaths.add(this.stormExternalClasspathDaemon);
    +        }
    +        classPaths.addAll(extraJars);
    +        return StringUtils.join(classPaths, System.getProperty("path" +
    +                ".separator"));
    +    }
    +
    +    String confValue (String name, List<String> extraPaths, boolean 
daemon) {
    +        // The original code from python started a process that started a 
jvm
    +        // with backtype.storm.command.config_value main method that would
    +        // read the conf value and print it out to an output stream. python
    +        // tapped on to the output stream of that subprocess and returned 
the
    +        // confvalue for the name. Because the pythong code has been 
shipped
    +        // to java now it should not spawn a new process which is a jvm 
since
    +        // we are already in jvm. Instead it should just be doing as the 
code
    +        // commeneted below.
    +        // However looking at the pythong code it was
    +        // starting a jvm with -cp argument that had classpaths which might
    +        // not be available to this java process. Hence there is a chance
    +        // that the below code might break existing scripts. As a result I
    +        // have decided to still spawn a new process from java just like
    +        // python with similar classpaths being constructed for the jvm
    +        // execution
    +        /*IFn fn = Utils.loadClojureFn("backtype.storm.config",
    +                "read-storm-config");
    +        Object o = fn.invoke();
    +        return ((Map) o).get(name).toString();*/
    +        String confValue = "";
    +        ProcessBuilder processBuilder = new 
ProcessBuilder(this.javaCommand,
    +                "-client", this.getConfigOptions(), "-Dstorm.conf.file=" +
    +                this.configFile, "-cp", this.getClassPath(extraPaths, 
daemon),
    +                "backtype.storm.command.config_value", name);
    +        BufferedReader br;
    +        try {
    +            Process process = processBuilder.start();
    +            br = new BufferedReader(new InputStreamReader(process
    +                    .getInputStream(), StandardCharsets.UTF_8));
    +            process.waitFor();
    +            String line;
    +            while ((line = br.readLine()) != null) {
    +                String[] tokens = line.split(" ");
    +                if ("VALUE:".equals(tokens[0])) {
    +                    confValue = 
StringUtils.join(Arrays.copyOfRange(tokens, 1,
    +                            tokens.length), " ");
    +                    break;
    +                }
    +            }
    +            br.close();
    +        } catch (Exception ex) {
    +            System.out.println("Exception occured while starting process 
via " +
    +                    "processbuilder " + ex.getMessage());
    +        }
    +        return confValue;
    +    }
    +
    +    void executeStormClass (String className, String jvmType, List<String>
    +            jvmOptions, List<String> extraJars, List<String> args, boolean
    +            fork, boolean daemon, String daemonName) {
    +        List<String> extraPaths = new ArrayList<>();
    +        extraPaths.add(this.clusterConfDirectory);
    +        String stormLogDirectory = this.confValue("storm.log.dir",
    +                extraPaths, daemon);
    +        if ((stormLogDirectory == null) || ("".equals(stormLogDirectory)) 
||
    +                ("nil".equals(stormLogDirectory))) {
    +            stormLogDirectory = this.stormHomeDirectory + 
this.fileSeparator
    +                    + "logs";
    +        }
    +        List<String> commandList = new ArrayList<String>();
    +        commandList.add(this.javaCommand);
    +        commandList.add(jvmType);
    +        commandList.add("-Ddaemon.name=" + daemonName);
    +        commandList.add(this.getConfigOptions());
    +        commandList.add("-Dstorm.home=" + this.stormHomeDirectory);
    +        commandList.add("-Dstorm.log.dir=" + stormLogDirectory);
    +        commandList.add("-Djava.library.path=" + this
    +                .confValue("java.library.path", extraJars, daemon));
    +        commandList.add("-Dstorm.conf.file=" + this.configFile);
    +        commandList.add("-cp");
    +        commandList.add(this.getClassPath(extraJars, daemon));
    +        commandList.addAll(jvmOptions);
    +        commandList.add(className);
    +        commandList.addAll(args);
    +        ProcessBuilder processBuilder = new ProcessBuilder(commandList);
    +        processBuilder.inheritIO();
    +        try {
    +            Process process = processBuilder.start();
    +            System.out.println("Executing the command: ");
    +            String commandLine = StringUtils.join(commandList, " ");
    +            System.out.println(commandLine);
    +            if (daemon == true) {
    +                Runtime.getRuntime().addShutdownHook(new ShutdownHookThread
    +                        (process, commandLine));
    +            }
    +            System.out.println("Waiting for subprocess to finish");
    +            process.waitFor();
    +            System.out.println("subprocess finished");
    +            System.out.println("Exit value from subprocess is :" + process
    +                    .exitValue());
    +        } catch (Exception ex) {
    +            System.out.println("Exception occured while starting process 
via " +
    +                    "processbuilder " + ex.getMessage());
    +        }
    --- End diff --
    
    not all process need waitfor(). maybe you can add option. Like this:
    
        public static java.lang.Process launch_process(final String command, 
final Map<String, String> environment, boolean backend) throws IOException {
    
            if (backend == true) {
                new Thread(new Runnable() {
    
                    @Override
                    public void run() {
                        String[] cmdlist = (new String("nohup " + command + " 
&")).split(" ");
                        try {
                            launchProcess(cmdlist, environment);
                        } catch (IOException e) {
                            LOG.error("Failed to run " + command + ":" + 
e.getCause(), e);
                        }
                    }
                }).start();
                return null;
            } else {
                String[] cmdlist = command.split(" ");
                return launchProcess(cmdlist, environment);
            }
        }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to