[
https://issues.apache.org/jira/browse/STORM-904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15010812#comment-15010812
]
ASF GitHub Bot commented on STORM-904:
--------------------------------------
Github user hustfxj commented on a diff in the pull request:
https://github.com/apache/storm/pull/662#discussion_r45188898
--- Diff: storm-core/src/jvm/backtype/storm/utils/StormCommandExecutor.java
---
@@ -0,0 +1,868 @@
+package backtype.storm.utils;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+import clojure.lang.IFn;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.SystemUtils;
+
+/**
+ * Created by pshah on 7/17/15.
+ */
+abstract class StormCommandExecutor {
+ final String NIMBUS_CLASS = "backtype.storm.daemon.nimbus";
+ final String SUPERVISOR_CLASS = "backtype.storm.daemon.supervisor";
+ final String UI_CLASS = "backtype.storm.ui.core";
+ final String LOGVIEWER_CLASS = "backtype.storm.daemon.logviewer";
+ final String DRPC_CLASS = "backtype.storm.daemon.drpc";
+ final String REPL_CLASS = "clojure.main";
+ final String ACTIVATE_CLASS = "backtype.storm.command.activate";
+ final String DEACTIVATE_CLASS = "backtype.storm.command.deactivate";
+ final String REBALANCE_CLASS = "backtype.storm.command.rebalance";
+ final String LIST_CLASS = "backtype.storm.command.list";
+ final String DEVZOOKEEPER_CLASS =
"backtype.storm.command.dev_zookeeper";
+ final String VERSION_CLASS = "backtype.storm.utils.VersionInfo";
+ final String MONITOR_CLASS = "backtype.storm.command.monitor";
+ final String UPLOADCREDENTIALS_CLASS = "backtype.storm.command" +
+ ".upload_credentials";
+ final String GETERRORS_CLASS = "backtype.storm.command.get_errors";
+ final String SHELL_CLASS = "backtype.storm.command.shell_submission";
+ String stormHomeDirectory;
+ String userConfDirectory;
+ String stormConfDirectory;
+ String clusterConfDirectory;
+ String stormLibDirectory;
+ String stormBinDirectory;
+ String stormLog4jConfDirectory;
+ String configFile = "";
+ String javaCommand;
+ List<String> configOptions = new ArrayList<String>();
+ String stormExternalClasspath;
+ String stormExternalClasspathDaemon;
+ String fileSeparator;
+ final List<String> COMMANDS = Arrays.asList("jar", "kill", "shell",
+ "nimbus", "ui", "logviewer", "drpc", "supervisor",
+ "localconfvalue", "remoteconfvalue", "repl", "classpath",
+ "activate", "deactivate", "rebalance", "help", "list",
+ "dev-zookeeper", "version", "monitor", "upload-credentials",
+ "get-errors");
+
+ public static void main (String[] args) {
+ for (String arg : args) {
+ System.out.println("Argument ++ is " + arg);
+ }
+ StormCommandExecutor stormCommandExecutor;
+ if (System.getProperty("os.name").startsWith("Windows")) {
+ stormCommandExecutor = new WindowsStormCommandExecutor();
+ } else {
+ stormCommandExecutor = new UnixStormCommandExecutor();
+ }
+ stormCommandExecutor.initialize();
+ stormCommandExecutor.execute(args);
+ }
+
+ StormCommandExecutor () {
+
+ }
+
+ abstract void initialize ();
+
+ abstract void execute (String[] args);
+
+ void callMethod (String command, List<String> args) {
+ Class implementation = this.getClass();
+ String methodName = command.replace("-", "") + "Command";
+ try {
+ Method method = implementation.getDeclaredMethod(methodName,
List
+ .class);
+ method.invoke(this, args);
+ } catch (NoSuchMethodException ex) {
+ System.out.println("No such method exception occured while
trying" +
+ " to run storm method " + command);
+ } catch (IllegalAccessException ex) {
+ System.out.println("Illegal access exception occured while
trying" +
+ " to run storm method " + command);
+ } catch (IllegalArgumentException ex) {
+ System.out.println("Illegal argument exception occured while "
+
+ "trying" + " to run storm method " + command);
+ } catch (InvocationTargetException ex) {
+ System.out.println("Invocation target exception occured while
" +
+ "trying" + " to run storm method " + command);
+ }
+ }
+}
+
+class UnixStormCommandExecutor extends StormCommandExecutor {
+
+ UnixStormCommandExecutor () {
+
+ }
+
+ void initialize () {
+ Collections.sort(this.COMMANDS);
+ this.fileSeparator = System .getProperty ("file.separator");
+ this.stormHomeDirectory = System.getenv("STORM_BASE_DIR");
+ this.userConfDirectory = System.getProperty("user.home") +
+ this.fileSeparator + "" +
+ ".storm";
+ this.stormConfDirectory = System.getenv("STORM_CONF_DIR");
+ this.clusterConfDirectory = this.stormConfDirectory == null ?
(this
+ .stormHomeDirectory + this.fileSeparator + "conf") : this
+ .stormConfDirectory;
+ File f = new File(this.userConfDirectory + this.fileSeparator +
+ "storm.yaml");
+ if (!f.isFile()) {
+ this.userConfDirectory = this.clusterConfDirectory;
+ }
+ this.stormLibDirectory = this.stormHomeDirectory +
this.fileSeparator +
+ "lib";
+ this.stormBinDirectory = this.stormHomeDirectory +
this.fileSeparator +
+ "bin";
+ this.stormLog4jConfDirectory = this.stormHomeDirectory +
+ this.fileSeparator + "log4j2";
+ if (System.getenv("JAVA_HOME") != null) {
+ this.javaCommand = System.getenv("JAVA_HOME") +
this.fileSeparator +
+ "bin" + this.fileSeparator + "java";
+ if (!(new File(this.javaCommand).exists())) {
+ System.out.println("ERROR: JAVA_HOME is invalid. Could
not " +
+ "find " + this.javaCommand);
+ System.exit(1);
+ }
+ } else {
+ this.javaCommand = "java";
+ }
+ this.stormExternalClasspath = System.getenv("STORM_EXT_CLASSPATH");
+ this.stormExternalClasspathDaemon = System.getenv
+ ("STORM_EXT_CLASSPATH_DAEMON");
+ if (!(new File(this.stormLibDirectory).exists())) {
+
System.out.println("******************************************");
+ System.out.println("The storm client can only be run from
within " +
+ "a release. " + "You appear to be trying to run the
client" +
+ " from a checkout of Storm's source code.");
+ System.out.println("You can download a Storm release at " +
+ "http://storm-project.net/downloads.html");
+
System.out.println("******************************************");
+ System.exit(1);
+ }
+ //System.getProperties().list(System.out);
+ }
+
+ void execute (String[] args) {
+ if (args.length == 0) {
+ this.printUsage();
+ System.exit(-1);
+ }
+ List<String> commandArgs = new ArrayList<String>();
+ for (int i = 0; i < args.length; ++i) {
+ if (args[i] == "-c") {
+ this.configOptions.add(args[++i]);
+ } else if (args[i] == "--config") {
+ this.configFile = args[++i];
+ } else {
+ commandArgs.add(args[i]);
+ }
+ }
+ if ((commandArgs.size() == 0) || (!this.COMMANDS.contains
+ (commandArgs.get(0)))) {
+ System.out.println("Unknown command: [storm " +
StringUtils.join
+ (args, " ") + "]");
+ this.printUsage();
+ System.exit(254);
+
+ }
+ this.callMethod(commandArgs.get(0), commandArgs.subList(1,
+ commandArgs.size()));
+
+ }
+
+ String getConfigOptions() {
+ String configOptions = "-Dstorm.options=";
+ //TODO - do urlencode here. python does quote_plus to each
configoption
+ return configOptions + StringUtils.join(this.configOptions, ',');
+
+ }
+
+ List<String> getJarsFull (String directory) {
+ List<String> fullJarFiles = new ArrayList<String>();
+ File file = new File(directory);
+ File[] files = file.listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (f.getName().endsWith(".jar")) {
+ fullJarFiles.add(f.getPath());
+ }
+ }
+ }
+ return fullJarFiles;
+ }
+
+ String getClassPath (List<String> extraJars, boolean daemon) {
+ List<String> classPaths =
this.getJarsFull(this.stormHomeDirectory);
+ classPaths.addAll(this.getJarsFull(this.stormLibDirectory));
+ classPaths.addAll(this.getJarsFull(this.stormHomeDirectory + this
+ .fileSeparator + "extlib"));
+ if (daemon == true) {
+ classPaths.addAll(this.getJarsFull(this.stormHomeDirectory +
this
+ .fileSeparator + "extlib-daemon"));
+ }
+ if (this.stormExternalClasspath != null) {
+ classPaths.add(this.stormExternalClasspath);
+ }
+ if (this.stormExternalClasspathDaemon != null) {
+ classPaths.add(this.stormExternalClasspathDaemon);
+ }
+ classPaths.addAll(extraJars);
+ return StringUtils.join(classPaths, System.getProperty("path" +
+ ".separator"));
+ }
+
+ String confValue (String name, List<String> extraPaths, boolean
daemon) {
+ // The original code from python started a process that started a
jvm
+ // with backtype.storm.command.config_value main method that would
+ // read the conf value and print it out to an output stream. python
+ // tapped on to the output stream of that subprocess and returned
the
+ // confvalue for the name. Because the pythong code has been
shipped
+ // to java now it should not spawn a new process which is a jvm
since
+ // we are already in jvm. Instead it should just be doing as the
code
+ // commeneted below.
+ // However looking at the pythong code it was
+ // starting a jvm with -cp argument that had classpaths which might
+ // not be available to this java process. Hence there is a chance
+ // that the below code might break existing scripts. As a result I
+ // have decided to still spawn a new process from java just like
+ // python with similar classpaths being constructed for the jvm
+ // execution
+ /*IFn fn = Utils.loadClojureFn("backtype.storm.config",
+ "read-storm-config");
+ Object o = fn.invoke();
+ return ((Map) o).get(name).toString();*/
+ String confValue = "";
+ ProcessBuilder processBuilder = new
ProcessBuilder(this.javaCommand,
+ "-client", this.getConfigOptions(), "-Dstorm.conf.file=" +
+ this.configFile, "-cp", this.getClassPath(extraPaths,
daemon),
+ "backtype.storm.command.config_value", name);
+ BufferedReader br;
+ try {
+ Process process = processBuilder.start();
+ br = new BufferedReader(new InputStreamReader(process
+ .getInputStream(), StandardCharsets.UTF_8));
+ process.waitFor();
+ String line;
+ while ((line = br.readLine()) != null) {
+ String[] tokens = line.split(" ");
+ if ("VALUE:".equals(tokens[0])) {
+ confValue =
StringUtils.join(Arrays.copyOfRange(tokens, 1,
+ tokens.length), " ");
+ break;
+ }
+ }
+ br.close();
+ } catch (Exception ex) {
+ System.out.println("Exception occured while starting process
via " +
+ "processbuilder " + ex.getMessage());
+ }
+ return confValue;
+ }
+
+ void executeStormClass (String className, String jvmType, List<String>
+ jvmOptions, List<String> extraJars, List<String> args, boolean
+ fork, boolean daemon, String daemonName) {
+ List<String> extraPaths = new ArrayList<>();
+ extraPaths.add(this.clusterConfDirectory);
+ String stormLogDirectory = this.confValue("storm.log.dir",
+ extraPaths, daemon);
+ if ((stormLogDirectory == null) || ("".equals(stormLogDirectory))
||
+ ("nil".equals(stormLogDirectory))) {
+ stormLogDirectory = this.stormHomeDirectory +
this.fileSeparator
+ + "logs";
+ }
+ List<String> commandList = new ArrayList<String>();
+ commandList.add(this.javaCommand);
+ commandList.add(jvmType);
+ commandList.add("-Ddaemon.name=" + daemonName);
+ commandList.add(this.getConfigOptions());
+ commandList.add("-Dstorm.home=" + this.stormHomeDirectory);
+ commandList.add("-Dstorm.log.dir=" + stormLogDirectory);
+ commandList.add("-Djava.library.path=" + this
+ .confValue("java.library.path", extraJars, daemon));
+ commandList.add("-Dstorm.conf.file=" + this.configFile);
+ commandList.add("-cp");
+ commandList.add(this.getClassPath(extraJars, daemon));
+ commandList.addAll(jvmOptions);
+ commandList.add(className);
+ commandList.addAll(args);
+ ProcessBuilder processBuilder = new ProcessBuilder(commandList);
+ processBuilder.inheritIO();
+ try {
+ Process process = processBuilder.start();
+ System.out.println("Executing the command: ");
+ String commandLine = StringUtils.join(commandList, " ");
+ System.out.println(commandLine);
+ if (daemon == true) {
+ Runtime.getRuntime().addShutdownHook(new ShutdownHookThread
+ (process, commandLine));
+ }
+ System.out.println("Waiting for subprocess to finish");
+ process.waitFor();
+ System.out.println("subprocess finished");
+ System.out.println("Exit value from subprocess is :" + process
+ .exitValue());
+ } catch (Exception ex) {
+ System.out.println("Exception occured while starting process
via " +
+ "processbuilder " + ex.getMessage());
+ }
--- End diff --
not all process need waitfor(). maybe you can add option. Like this:
public static java.lang.Process launch_process(final String command,
final Map<String, String> environment, boolean backend) throws IOException {
if (backend == true) {
new Thread(new Runnable() {
@Override
public void run() {
String[] cmdlist = (new String("nohup " + command + "
&")).split(" ");
try {
launchProcess(cmdlist, environment);
} catch (IOException e) {
LOG.error("Failed to run " + command + ":" +
e.getCause(), e);
}
}
}).start();
return null;
} else {
String[] cmdlist = command.split(" ");
return launchProcess(cmdlist, environment);
}
}
> move storm bin commands to java and provide appropriate bindings for windows
> and linux
> --------------------------------------------------------------------------------------
>
> Key: STORM-904
> URL: https://issues.apache.org/jira/browse/STORM-904
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core
> Reporter: Sriharsha Chintalapani
> Assignee: Priyank Shah
>
> Currently we have python and .cmd implementation for windows. This is
> becoming increasing difficult upkeep both versions. Lets make all the main
> code of starting daemons etc. to java and provider wrapper scripts in shell
> and batch for linux and windows respectively.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)