http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Constants.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/Constants.java b/jstorm-core/src/main/java/backtype/storm/Constants.java index 2797b69..da2d5b7 100755 --- a/jstorm-core/src/main/java/backtype/storm/Constants.java +++ b/jstorm-core/src/main/java/backtype/storm/Constants.java @@ -20,9 +20,8 @@ package backtype.storm; import backtype.storm.coordination.CoordinatedBolt; import clojure.lang.RT; - public class Constants { - public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; + public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream"; public static final long SYSTEM_TASK_ID = -1; public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]"); @@ -32,6 +31,6 @@ public class Constants { public static final String METRICS_STREAM_ID = "__metrics"; public static final String METRICS_TICK_STREAM_ID = "__metrics_tick"; public static final String CREDENTIALS_CHANGED_STREAM_ID = "__credentials"; - + public static final String JSTORM_CONF_DIR = "JSTORM_CONF_DIR"; }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java b/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java index 9319ce1..88adfb4 100755 --- a/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java +++ b/jstorm-core/src/main/java/backtype/storm/GenericOptionsParser.java @@ -108,94 +108,92 @@ import org.yaml.snakeyaml.Yaml; public class GenericOptionsParser { static final Logger LOG = LoggerFactory.getLogger(GenericOptionsParser.class); - + static final Charset UTF8 = Charset.forName("UTF-8"); - + public static final String TOPOLOGY_LIB_PATH = "topology.lib.path"; - + public static final String TOPOLOGY_LIB_NAME = "topology.lib.name"; - + Config conf; - + CommandLine commandLine; - + // Order in this map is important for these purposes: // - configuration priority static final LinkedHashMap<String, OptionProcessor> optionProcessors = new LinkedHashMap<String, OptionProcessor>(); - + public GenericOptionsParser(Config conf, String[] args) throws ParseException { this(conf, new Options(), args); } - + public GenericOptionsParser(Config conf, Options options, String[] args) throws ParseException { this.conf = conf; parseGeneralOptions(options, conf, args); } - + public String[] getRemainingArgs() { return commandLine.getArgs(); } - + public Config getConfiguration() { return conf; } - + static Options buildGeneralOptions(Options opts) { Options r = new Options(); - + for (Object o : opts.getOptions()) r.addOption((Option) o); - - Option libjars = OptionBuilder.withArgName("paths").hasArg().withDescription("comma separated jars to be used by the submitted topology").create("libjars"); + + Option libjars = + OptionBuilder.withArgName("paths").hasArg().withDescription("comma separated jars to be used by the submitted topology").create("libjars"); r.addOption(libjars); optionProcessors.put("libjars", new LibjarsProcessor()); - + Option conf = OptionBuilder.withArgName("configuration file").hasArg().withDescription("an application configuration file").create("conf"); r.addOption(conf); optionProcessors.put("conf", new ConfFileProcessor()); - + // Must come after `conf': this option is of higher priority Option extraConfig = OptionBuilder.withArgName("D").hasArg().withDescription("extra configurations (preserving types)").create("D"); r.addOption(extraConfig); optionProcessors.put("D", new ExtraConfigProcessor()); - + return r; } - + void parseGeneralOptions(Options opts, Config conf, String[] args) throws ParseException { opts = buildGeneralOptions(opts); CommandLineParser parser = new GnuParser(); commandLine = parser.parse(opts, args, true); processGeneralOptions(conf, commandLine); } - + void processGeneralOptions(Config conf, CommandLine commandLine) throws ParseException { for (Map.Entry<String, OptionProcessor> e : optionProcessors.entrySet()) if (commandLine.hasOption(e.getKey())) e.getValue().process(conf, commandLine); } - + static List<File> validateFiles(String pathList) throws IOException { List<File> l = new ArrayList<File>(); - + for (String s : pathList.split(",")) { File file = new File(s); if (!file.exists()) throw new FileNotFoundException("File `" + file.getAbsolutePath() + "' does not exist"); - + l.add(file); } - + return l; } - + public static void printGenericCommandUsage(PrintStream out) { String[] strs = - new String[] { - "Generic options supported are", - " -conf <conf.xml> load configurations from", - " <conf.xml>", - " -conf <conf.yaml> load configurations from", + new String[] { "Generic options supported are", " -conf <conf.xml> load configurations from", + " <conf.xml>", " -conf <conf.yaml> load configurations from", " <conf.yaml>", " -D <key>=<value> set <key> in configuration", " to <value> (preserve value's type)", @@ -205,11 +203,11 @@ public class GenericOptionsParser { for (String s : strs) out.println(s); } - + static interface OptionProcessor { public void process(Config conf, CommandLine commandLine) throws ParseException; } - + static class LibjarsProcessor implements OptionProcessor { @Override public void process(Config conf, CommandLine commandLine) throws ParseException { @@ -223,31 +221,31 @@ public class GenericOptionsParser { } conf.put(TOPOLOGY_LIB_PATH, jars); conf.put(TOPOLOGY_LIB_NAME, names); - + } catch (IOException e) { throw new ParseException(e.getMessage()); } } } - + static class ExtraConfigProcessor implements OptionProcessor { static final Yaml yaml = new Yaml(); - + @Override public void process(Config conf, CommandLine commandLine) throws ParseException { for (String s : commandLine.getOptionValues("D")) { String[] keyval = s.split("=", 2); if (keyval.length != 2) throw new ParseException("Invalid option value `" + s + "'"); - + conf.putAll((Map) yaml.load(keyval[0] + ": " + keyval[1])); } } } - + static class ConfFileProcessor implements OptionProcessor { static final Yaml yaml = new Yaml(); - + static Map loadYamlConf(String f) throws IOException { InputStreamReader reader = null; try { @@ -259,13 +257,13 @@ public class GenericOptionsParser { reader.close(); } } - + static Map loadConf(String f) throws IOException { if (f.endsWith(".yaml")) return loadYamlConf(f); throw new IOException("Unknown configuration file type: " + f + " does not end with either .yaml"); } - + @Override public void process(Config conf, CommandLine commandLine) throws ParseException { try { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java b/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java index 1a7bc1b..f8f9e9b 100755 --- a/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java +++ b/jstorm-core/src/main/java/backtype/storm/ICredentialsListener.java @@ -26,7 +26,8 @@ import java.util.Map; public interface ICredentialsListener { /** * Called when the credentials of a topology have changed. + * * @param credentials the new credentials, could be null. */ - public void setCredentials(Map<String,String> credentials); + public void setCredentials(Map<String, String> credentials); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java b/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java index 7d5aa35..7d31f07 100755 --- a/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java +++ b/jstorm-core/src/main/java/backtype/storm/ILocalCluster.java @@ -30,20 +30,33 @@ import backtype.storm.generated.Credentials; import java.util.Map; - public interface ILocalCluster { void submitTopology(String topologyName, Map conf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException; - void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException, InvalidTopologyException; + + void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) throws AlreadyAliveException, + InvalidTopologyException; + void uploadNewCredentials(String topologyName, Credentials creds); + void killTopology(String topologyName) throws NotAliveException; + void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException; + void activate(String topologyName) throws NotAliveException; + void deactivate(String topologyName) throws NotAliveException; + void rebalance(String name, RebalanceOptions options) throws NotAliveException; + void shutdown(); + String getTopologyConf(String id); + StormTopology getTopology(String id); + ClusterSummary getClusterInfo(); + TopologyInfo getTopologyInfo(String id); + Map getState(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java b/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java index e478dca..4482ecd 100755 --- a/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java +++ b/jstorm-core/src/main/java/backtype/storm/ILocalDRPC.java @@ -21,7 +21,6 @@ import backtype.storm.daemon.Shutdownable; import backtype.storm.generated.DistributedRPC; import backtype.storm.generated.DistributedRPCInvocations; - public interface ILocalDRPC extends DistributedRPC.Iface, DistributedRPCInvocations.Iface, Shutdownable { - public String getServiceId(); + public String getServiceId(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalCluster.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/LocalCluster.java b/jstorm-core/src/main/java/backtype/storm/LocalCluster.java index b55bac4..c25c260 100755 --- a/jstorm-core/src/main/java/backtype/storm/LocalCluster.java +++ b/jstorm-core/src/main/java/backtype/storm/LocalCluster.java @@ -17,30 +17,21 @@ */ package backtype.storm; -import java.util.Map; - +import backtype.storm.generated.*; +import backtype.storm.utils.Utils; +import com.alibaba.jstorm.utils.JStormUtils; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.generated.ClusterSummary; -import backtype.storm.generated.Credentials; -import backtype.storm.generated.KillOptions; -import backtype.storm.generated.NotAliveException; -import backtype.storm.generated.RebalanceOptions; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.SubmitOptions; -import backtype.storm.generated.TopologyInfo; -import backtype.storm.utils.Utils; - -import com.alibaba.jstorm.utils.JStormUtils; +import java.util.Map; public class LocalCluster implements ILocalCluster { - + public static Logger LOG = LoggerFactory.getLogger(LocalCluster.class); - + private LocalClusterMap state; - + protected void setLogger() { // the code is for log4j // boolean needReset = true; @@ -56,61 +47,62 @@ public class LocalCluster implements ILocalCluster { // BasicConfigurator.configure(); // rootLogger.setLevel(Level.INFO); // } - + } - + // this is easy to debug protected static LocalCluster instance = null; - + public static LocalCluster getInstance() { return instance; } - + public LocalCluster() { synchronized (LocalCluster.class) { if (instance != null) { throw new RuntimeException("LocalCluster should be single"); } setLogger(); - + // fix in zk occur Address family not supported by protocol family: // connect System.setProperty("java.net.preferIPv4Stack", "true"); - + this.state = LocalUtils.prepareLocalCluster(); if (this.state == null) throw new RuntimeException("prepareLocalCluster error"); - + instance = this; } } - + @Override public void submitTopology(String topologyName, Map conf, StormTopology topology) { submitTopologyWithOpts(topologyName, conf, topology, null); } - + @Override public void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts) { // TODO Auto-generated method stub if (!Utils.isValidConf(conf)) throw new RuntimeException("Topology conf is not json-serializable"); JStormUtils.setLocalMode(true); - + conf.put(Config.STORM_CLUSTER_MODE, "local"); + try { if (submitOpts == null) { state.getNimbus().submitTopology(topologyName, null, Utils.to_json(conf), topology); } else { state.getNimbus().submitTopologyWithOpts(topologyName, null, Utils.to_json(conf), topology, submitOpts); } - + } catch (Exception e) { // TODO Auto-generated catch block LOG.error("Failed to submit topology " + topologyName, e); throw new RuntimeException(e); } } - + @Override public void killTopology(String topologyName) { // TODO Auto-generated method stub @@ -124,7 +116,7 @@ public class LocalCluster implements ILocalCluster { LOG.error("fail to kill Topology " + topologyName, e); } } - + @Override public void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException { // TODO Auto-generated method stub @@ -136,7 +128,7 @@ public class LocalCluster implements ILocalCluster { throw new RuntimeException(e); } } - + @Override public void activate(String topologyName) { // TODO Auto-generated method stub @@ -148,7 +140,7 @@ public class LocalCluster implements ILocalCluster { throw new RuntimeException(e); } } - + @Override public void deactivate(String topologyName) { // TODO Auto-generated method stub @@ -160,7 +152,7 @@ public class LocalCluster implements ILocalCluster { throw new RuntimeException(e); } } - + @Override public void rebalance(String name, RebalanceOptions options) { // TODO Auto-generated method stub @@ -172,7 +164,7 @@ public class LocalCluster implements ILocalCluster { throw new RuntimeException(e); } } - + @Override public void shutdown() { // TODO Auto-generated method stub @@ -180,8 +172,9 @@ public class LocalCluster implements ILocalCluster { // it take 10 seconds to remove topology's node JStormUtils.sleepMs(10 * 1000); this.state.clean(); + instance = null; } - + @Override public String getTopologyConf(String id) { // TODO Auto-generated method stub @@ -193,7 +186,7 @@ public class LocalCluster implements ILocalCluster { } return null; } - + @Override public StormTopology getTopology(String id) { // TODO Auto-generated method stub @@ -208,7 +201,7 @@ public class LocalCluster implements ILocalCluster { } return null; } - + @Override public ClusterSummary getClusterInfo() { // TODO Auto-generated method stub @@ -220,7 +213,7 @@ public class LocalCluster implements ILocalCluster { } return null; } - + @Override public TopologyInfo getTopologyInfo(String id) { // TODO Auto-generated method stub @@ -235,7 +228,7 @@ public class LocalCluster implements ILocalCluster { } return null; } - + /*** * You should use getLocalClusterMap() to instead.This function will always return null * */ @@ -245,11 +238,11 @@ public class LocalCluster implements ILocalCluster { // TODO Auto-generated method stub return null; } - + public LocalClusterMap getLocalClusterMap() { return state; } - + public static void main(String[] args) throws Exception { LocalCluster localCluster = null; try { @@ -269,7 +262,7 @@ public class LocalCluster implements ILocalCluster { } catch (Exception e) { // TODO Auto-generated catch block LOG.error("fail to uploadNewCredentials of topologyId: " + topologyName, e); - } + } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java b/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java index bd99c76..39f23be 100755 --- a/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java +++ b/jstorm-core/src/main/java/backtype/storm/LocalClusterMap.java @@ -31,83 +31,83 @@ import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.zk.Factory; public class LocalClusterMap { - + public static Logger LOG = LoggerFactory.getLogger(LocalClusterMap.class); - + private NimbusServer nimbusServer; - + private ServiceHandler nimbus; - + private Factory zookeeper; - + private Map conf; - + private List<String> tmpDir; - + private SupervisorManger supervisor; - + public ServiceHandler getNimbus() { return nimbus; } - + public void setNimbus(ServiceHandler nimbus) { this.nimbus = nimbus; } - + public Factory getZookeeper() { return zookeeper; } - + public void setZookeeper(Factory zookeeper) { this.zookeeper = zookeeper; } - + public Map getConf() { return conf; } - + public void setConf(Map conf) { this.conf = conf; } - + public NimbusServer getNimbusServer() { return nimbusServer; } - + public void setNimbusServer(NimbusServer nimbusServer) { this.nimbusServer = nimbusServer; } - + public SupervisorManger getSupervisor() { return supervisor; } - + public void setSupervisor(SupervisorManger supervisor) { this.supervisor = supervisor; } - + public List<String> getTmpDir() { return tmpDir; } - + public void setTmpDir(List<String> tmpDir) { this.tmpDir = tmpDir; } - + public void clean() { - + if (supervisor != null) { supervisor.ShutdownAllWorkers(); supervisor.shutdown(); } - + if (nimbusServer != null) { nimbusServer.cleanup(); } - + if (zookeeper != null) zookeeper.shutdown(); - + // it will hava a problem: // java.io.IOException: Unable to delete file: // {TmpPath}\{UUID}\version-2\log.1 @@ -122,5 +122,5 @@ public class LocalClusterMap { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java b/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java index 4113bf4..a838026 100755 --- a/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java +++ b/jstorm-core/src/main/java/backtype/storm/LocalDRPC.java @@ -28,16 +28,16 @@ import com.alibaba.jstorm.drpc.Drpc; public class LocalDRPC implements ILocalDRPC { private static final Logger LOG = LoggerFactory.getLogger(LocalDRPC.class); - + private Drpc handler = new Drpc(); private Thread thread; - + private final String serviceId; - + public LocalDRPC() { - + thread = new Thread(new Runnable() { - + @Override public void run() { LOG.info("Begin to init local Drpc"); @@ -51,10 +51,10 @@ public class LocalDRPC implements ILocalDRPC { } }); thread.start(); - + serviceId = ServiceRegistry.registerService(handler); } - + @Override public String execute(String functionName, String funcArgs) { // TODO Auto-generated method stub @@ -65,36 +65,36 @@ public class LocalDRPC implements ILocalDRPC { throw new RuntimeException(e); } } - + @Override public void result(String id, String result) throws TException { // TODO Auto-generated method stub handler.result(id, result); } - + @Override public DRPCRequest fetchRequest(String functionName) throws TException { // TODO Auto-generated method stub return handler.fetchRequest(functionName); } - + @Override public void failRequest(String id) throws TException { // TODO Auto-generated method stub handler.failRequest(id); } - + @Override public void shutdown() { // TODO Auto-generated method stub ServiceRegistry.unregisterService(this.serviceId); this.handler.shutdown(); } - + @Override public String getServiceId() { // TODO Auto-generated method stub return serviceId; } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/LocalUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/LocalUtils.java b/jstorm-core/src/main/java/backtype/storm/LocalUtils.java index e32c07e..6e5023b 100755 --- a/jstorm-core/src/main/java/backtype/storm/LocalUtils.java +++ b/jstorm-core/src/main/java/backtype/storm/LocalUtils.java @@ -39,32 +39,32 @@ import com.alibaba.jstorm.zk.Factory; import com.alibaba.jstorm.zk.Zookeeper; public class LocalUtils { - + public static Logger LOG = LoggerFactory.getLogger(LocalUtils.class); - + public static LocalClusterMap prepareLocalCluster() { LocalClusterMap state = new LocalClusterMap(); try { List<String> tmpDirs = new ArrayList(); - + String zkDir = getTmpDir(); tmpDirs.add(zkDir); Factory zookeeper = startLocalZookeeper(zkDir); Map conf = getLocalConf(zookeeper.getZooKeeperServer().getClientPort()); - + String nimbusDir = getTmpDir(); tmpDirs.add(nimbusDir); Map nimbusConf = deepCopyMap(conf); nimbusConf.put(Config.STORM_LOCAL_DIR, nimbusDir); NimbusServer instance = new NimbusServer(); - + Map supervisorConf = deepCopyMap(conf); String supervisorDir = getTmpDir(); tmpDirs.add(supervisorDir); supervisorConf.put(Config.STORM_LOCAL_DIR, supervisorDir); Supervisor supervisor = new Supervisor(); IContext context = getLocalContext(supervisorConf); - + state.setNimbusServer(instance); state.setNimbus(instance.launcherLocalServer(nimbusConf, new DefaultInimbus())); state.setZookeeper(zookeeper); @@ -75,11 +75,11 @@ public class LocalUtils { } catch (Exception e) { LOG.error("prepare cluster error!", e); state.clean(); - + } return null; } - + private static Factory startLocalZookeeper(String tmpDir) { for (int i = 2000; i < 65535; i++) { try { @@ -90,11 +90,11 @@ public class LocalUtils { } throw new RuntimeException("No port is available to launch an inprocess zookeeper."); } - + private static String getTmpDir() { return System.getProperty("java.io.tmpdir") + File.separator + UUID.randomUUID(); } - + private static Map getLocalConf(int port) { List<String> zkServers = new ArrayList<String>(1); zkServers.add("localhost"); @@ -110,7 +110,7 @@ public class LocalUtils { ConfigExtension.setTaskCleanupTimeoutSec(conf, 0); return conf; } - + private static IContext getLocalContext(Map conf) { if (!(Boolean) conf.get(Config.STORM_LOCAL_MODE_ZMQ)) { IContext result = new NettyContext(); @@ -120,7 +120,7 @@ public class LocalUtils { } return null; } - + private static Map deepCopyMap(Map map) { return new HashMap(map); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java b/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java index 400875e..1666b29 100644 --- a/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java +++ b/jstorm-core/src/main/java/backtype/storm/StormSubmitter.java @@ -17,6 +17,14 @@ */ package backtype.storm; +import backtype.storm.generated.*; +import backtype.storm.utils.BufferFileInputStream; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -24,26 +32,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.InvalidTopologyException; -import backtype.storm.generated.Nimbus; -import backtype.storm.generated.NotAliveException; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.SubmitOptions; -import backtype.storm.generated.TopologyAssignException; -import backtype.storm.utils.BufferFileInputStream; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; - /** - * Use this class to submit topologies to run on the Storm cluster. You should - * run your program with the "storm jar" command from the command-line, and then - * use this class to submit your topologies. + * Use this class to submit topologies to run on the Storm cluster. You should run your program with the "storm jar" command from the command-line, and then use + * this class to submit your topologies. */ public class StormSubmitter { public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class); @@ -55,25 +46,20 @@ public class StormSubmitter { } /** - * Submits a topology to run on the cluster. A topology runs forever or - * until explicitly killed. + * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. - * @throws AlreadyAliveException if a topology with this name is already - * running + * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */ - public static void submitTopology(String name, Map stormConf, - StormTopology topology) throws AlreadyAliveException, - InvalidTopologyException { + public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException { submitTopology(name, stormConf, topology, null); } - public static void submitTopology(String name, Map stormConf, - StormTopology topology, SubmitOptions opts, List<File> jarFiles) + public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, List<File> jarFiles) throws AlreadyAliveException, InvalidTopologyException { if (jarFiles == null) { jarFiles = new ArrayList<File>(); @@ -83,8 +69,7 @@ public class StormSubmitter { for (File f : jarFiles) { if (!f.exists()) { - LOG.info(f.getName() + " is not existed: " - + f.getAbsolutePath()); + LOG.info(f.getName() + " is not existed: " + f.getAbsolutePath()); continue; } jars.put(f.getName(), f.getAbsolutePath()); @@ -96,32 +81,25 @@ public class StormSubmitter { submitTopology(name, stormConf, topology, opts); } - public static void submitTopology(String name, Map stormConf, - StormTopology topology, SubmitOptions opts, - ProgressListener listener) throws AlreadyAliveException, - InvalidTopologyException { + public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener listener) + throws AlreadyAliveException, InvalidTopologyException { submitTopology(name, stormConf, topology, opts); } /** - * Submits a topology to run on the cluster. A topology runs forever or - * until explicitly killed. + * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. - * @param options to manipulate the starting of the topology - * @throws AlreadyAliveException if a topology with this name is already - * running + * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted */ - public static void submitTopology(String name, Map stormConf, - StormTopology topology, SubmitOptions opts) - throws AlreadyAliveException, InvalidTopologyException { + public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, + InvalidTopologyException { if (!Utils.isValidConf(stormConf)) { - throw new IllegalArgumentException( - "Storm conf is not valid. Must be json-serializable"); + throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); stormConf.putAll(Utils.readCommandLineOpts()); @@ -137,20 +115,16 @@ public class StormSubmitter { NimbusClient client = NimbusClient.getConfiguredClient(conf); try { if (topologyNameExists(client, conf, name)) { - throw new RuntimeException("Topology with name `" + name - + "` already exists on cluster"); + throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } submitJar(client, conf); - LOG.info("Submitting topology " + name - + " in distributed mode with conf " + serConf); + LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if (opts != null) { - client.getClient().submitTopologyWithOpts(name, path, - serConf, topology, opts); + client.getClient().submitTopologyWithOpts(name, path, serConf, topology, opts); } else { // this is for backwards compatibility - client.getClient().submitTopology(name, path, serConf, - topology); + client.getClient().submitTopology(name, path, serConf, topology); } } finally { client.close(); @@ -173,43 +147,36 @@ public class StormSubmitter { } /** - * Submits a topology to run on the cluster with a progress bar. A topology - * runs forever or until explicitly killed. + * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. - * @throws AlreadyAliveException if a topology with this name is already - * running + * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted * @throws TopologyAssignException */ - public static void submitTopologyWithProgressBar(String name, - Map stormConf, StormTopology topology) - throws AlreadyAliveException, InvalidTopologyException { + public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException { submitTopologyWithProgressBar(name, stormConf, topology, null); } /** - * Submits a topology to run on the cluster with a progress bar. A topology - * runs forever or until explicitly killed. + * Submits a topology to run on the cluster with a progress bar. A topology runs forever or until explicitly killed. * * * @param name the name of the storm. * @param stormConf the topology-specific configuration. See {@link Config}. * @param topology the processing to execute. * @param opts to manipulate the starting of the topology - * @throws AlreadyAliveException if a topology with this name is already - * running + * @throws AlreadyAliveException if a topology with this name is already running * @throws InvalidTopologyException if an invalid topology was submitted * @throws TopologyAssignException */ - public static void submitTopologyWithProgressBar(String name, - Map stormConf, StormTopology topology, SubmitOptions opts) - throws AlreadyAliveException, InvalidTopologyException { + public static void submitTopologyWithProgressBar(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, + InvalidTopologyException { /** * remove progress bar in jstorm @@ -218,21 +185,11 @@ public class StormSubmitter { } public static boolean topologyNameExists(NimbusClient client, Map conf, String name) { - if (StringUtils.isBlank(name)) { - throw new RuntimeException("TopologyName is empty"); - } - try { - String topologyId = client.getClient().getTopologyId(name); - if (StringUtils.isBlank(topologyId) == false) { - return true; - } - return false; - - } catch (NotAliveException e) { - return false; + client.getClient().getTopologyInfoByName(name); + return true; } catch (Exception e) { - throw new RuntimeException(e); + return false; } } @@ -246,15 +203,9 @@ public class StormSubmitter { String localJar = System.getProperty("storm.jar"); path = client.getClient().beginFileUpload(); String[] pathCache = path.split("/"); - String uploadLocation = - path + "/stormjar-" + pathCache[pathCache.length - 1] - + ".jar"; - List<String> lib = - (List<String>) conf - .get(GenericOptionsParser.TOPOLOGY_LIB_NAME); - Map<String, String> libPath = - (Map<String, String>) conf - .get(GenericOptionsParser.TOPOLOGY_LIB_PATH); + String uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar"; + List<String> lib = (List<String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME); + Map<String, String> libPath = (Map<String, String>) conf.get(GenericOptionsParser.TOPOLOGY_LIB_PATH); if (lib != null && lib.size() != 0) { for (String libName : lib) { String jarPath = path + "/lib/" + libName; @@ -265,14 +216,12 @@ public class StormSubmitter { } else { if (localJar == null) { // no lib, no client jar - throw new RuntimeException( - "No client app jar, please upload it"); + throw new RuntimeException("No client app jar, please upload it"); } } if (localJar != null) { - submittedJar = - submitJar(conf, localJar, uploadLocation, client); + submittedJar = submitJar(conf, localJar, uploadLocation, client); } else { // no client jar, but with lib jar client.getClient().finishFileUpload(uploadLocation); @@ -285,36 +234,29 @@ public class StormSubmitter { } } - public static String submitJar(Map conf, String localJar, - String uploadLocation, NimbusClient client) { + public static String submitJar(Map conf, String localJar, String uploadLocation, NimbusClient client) { if (localJar == null) { - throw new RuntimeException( - "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload."); + throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload."); } try { - LOG.info("Uploading topology jar " + localJar - + " to assigned location: " + uploadLocation); + LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation); int bufferSize = 512 * 1024; - Object maxBufSizeObject = - conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE); + Object maxBufSizeObject = conf.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE); if (maxBufSizeObject != null) { bufferSize = Utils.getInt(maxBufSizeObject) / 2; } - BufferFileInputStream is = - new BufferFileInputStream(localJar, bufferSize); + BufferFileInputStream is = new BufferFileInputStream(localJar, bufferSize); while (true) { byte[] toSubmit = is.read(); if (toSubmit.length == 0) break; - client.getClient().uploadChunk(uploadLocation, - ByteBuffer.wrap(toSubmit)); + client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); } client.getClient().finishFileUpload(uploadLocation); - LOG.info("Successfully uploaded topology jar to assigned location: " - + uploadLocation); + LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation); return uploadLocation; } catch (Exception e) { throw new RuntimeException(e); @@ -350,8 +292,7 @@ public class StormSubmitter { * @param bytesUploaded - number of bytes transferred so far * @param totalBytes - total number of bytes of the file */ - public void onProgress(String srcFile, String targetFile, - long bytesUploaded, long totalBytes); + public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes); /** * called when the file is uploaded @@ -360,7 +301,6 @@ public class StormSubmitter { * @param targetFile - destination file * @param totalBytes - total number of bytes of the file */ - public void onCompleted(String srcFile, String targetFile, - long totalBytes); + public void onCompleted(String srcFile, String targetFile, long totalBytes); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/Tool.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/Tool.java b/jstorm-core/src/main/java/backtype/storm/Tool.java index 6722b24..0dc5e32 100755 --- a/jstorm-core/src/main/java/backtype/storm/Tool.java +++ b/jstorm-core/src/main/java/backtype/storm/Tool.java @@ -58,13 +58,13 @@ package backtype.storm; public abstract class Tool { Config config; - + public abstract int run(String[] args) throws Exception; - + public Config getConf() { return config; } - + public void setConf(Config config) { this.config = config; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/ToolRunner.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/ToolRunner.java b/jstorm-core/src/main/java/backtype/storm/ToolRunner.java index 33f5034..d70da41 100755 --- a/jstorm-core/src/main/java/backtype/storm/ToolRunner.java +++ b/jstorm-core/src/main/java/backtype/storm/ToolRunner.java @@ -32,6 +32,8 @@ import backtype.storm.utils.Utils; * href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a * href="{@docRoot} * to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} + * to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a + * href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} to parse the <a href="{@docRoot} * /backtype/storm/GenericOptionsParser.html#GenericOptions"> generic storm command line arguments</a> and modifies the <code>Config</code> of the * <code>Tool</code>. The application-specific options are passed along without being modified. * @@ -41,21 +43,22 @@ import backtype.storm.utils.Utils; public class ToolRunner { static final Logger LOG = LoggerFactory.getLogger(ToolRunner.class); - + public static void run(Tool tool, String[] args) { run(tool.getConf(), tool, args); } - + public static void run(Config conf, Tool tool, String[] args) { try { if (conf == null) { conf = new Config(); conf.putAll(Utils.readStormConfig()); } - + GenericOptionsParser parser = new GenericOptionsParser(conf, args); + LOG.info(conf.toString()); tool.setConf(conf); - + System.exit(tool.run(parser.getRemainingArgs())); } catch (ParseException e) { LOG.error("Error parsing generic options: {}", e.getMessage()); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java index 5de9bde..d3d1d37 100755 --- a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureBolt.java @@ -36,15 +36,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; - public class ClojureBolt implements IRichBolt, FinishedCallback { Map<String, StreamInfo> _fields; List<String> _fnSpec; List<String> _confSpec; List<Object> _params; - + IBolt _bolt; - + public ClojureBolt(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) { _fnSpec = fnSpec; _confSpec = confSpec; @@ -57,21 +56,23 @@ public class ClojureBolt implements IRichBolt, FinishedCallback { IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1)); try { IFn preparer = (IFn) hof.applyTo(RT.seq(_params)); - final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] { - Keyword.intern(Symbol.create("output-collector")), collector, - Keyword.intern(Symbol.create("context")), context}); - List<Object> args = new ArrayList<Object>() {{ - add(stormConf); - add(context); - add(collectorMap); - }}; - + final Map<Keyword, Object> collectorMap = + new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector, + Keyword.intern(Symbol.create("context")), context }); + List<Object> args = new ArrayList<Object>() { + { + add(stormConf); + add(context); + add(collectorMap); + } + }; + _bolt = (IBolt) preparer.applyTo(RT.seq(args)); - //this is kind of unnecessary for clojure + // this is kind of unnecessary for clojure try { _bolt.prepare(stormConf, context, collector); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } catch (Exception e) { throw new RuntimeException(e); @@ -85,16 +86,16 @@ public class ClojureBolt implements IRichBolt, FinishedCallback { @Override public void cleanup() { - try { - _bolt.cleanup(); - } catch(AbstractMethodError ame) { - - } + try { + _bolt.cleanup(); + } catch (AbstractMethodError ame) { + + } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _fields.keySet()) { + for (String stream : _fields.keySet()) { StreamInfo info = _fields.get(stream); declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields())); } @@ -102,7 +103,7 @@ public class ClojureBolt implements IRichBolt, FinishedCallback { @Override public void finishedId(Object id) { - if(_bolt instanceof FinishedCallback) { + if (_bolt instanceof FinishedCallback) { ((FinishedCallback) _bolt).finishedId(id); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java index f6422e3..fc231ce 100755 --- a/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/clojure/ClojureSpout.java @@ -39,37 +39,38 @@ public class ClojureSpout implements IRichSpout { List<String> _fnSpec; List<String> _confSpec; List<Object> _params; - + ISpout _spout; - + public ClojureSpout(List fnSpec, List confSpec, List<Object> params, Map<String, StreamInfo> fields) { _fnSpec = fnSpec; _confSpec = confSpec; _params = params; _fields = fields; } - @Override public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1)); try { IFn preparer = (IFn) hof.applyTo(RT.seq(_params)); - final Map<Keyword,Object> collectorMap = new PersistentArrayMap( new Object[] { - Keyword.intern(Symbol.create("output-collector")), collector, - Keyword.intern(Symbol.create("context")), context}); - List<Object> args = new ArrayList<Object>() {{ - add(conf); - add(context); - add(collectorMap); - }}; - + final Map<Keyword, Object> collectorMap = + new PersistentArrayMap(new Object[] { Keyword.intern(Symbol.create("output-collector")), collector, + Keyword.intern(Symbol.create("context")), context }); + List<Object> args = new ArrayList<Object>() { + { + add(conf); + add(context); + add(collectorMap); + } + }; + _spout = (ISpout) preparer.applyTo(RT.seq(args)); - //this is kind of unnecessary for clojure + // this is kind of unnecessary for clojure try { _spout.open(conf, context, collector); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } catch (Exception e) { throw new RuntimeException(e); @@ -80,8 +81,8 @@ public class ClojureSpout implements IRichSpout { public void close() { try { _spout.close(); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } @@ -89,8 +90,8 @@ public class ClojureSpout implements IRichSpout { public void nextTuple() { try { _spout.nextTuple(); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } @@ -99,8 +100,8 @@ public class ClojureSpout implements IRichSpout { public void ack(Object msgId) { try { _spout.ack(msgId); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } @@ -109,20 +110,20 @@ public class ClojureSpout implements IRichSpout { public void fail(Object msgId) { try { _spout.fail(msgId); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _fields.keySet()) { + for (String stream : _fields.keySet()) { StreamInfo info = _fields.get(stream); declarer.declareStream(stream, info.is_direct(), new Fields(info.get_output_fields())); } } - + @Override public Map<String, Object> getComponentConfiguration() { IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1)); @@ -137,8 +138,8 @@ public class ClojureSpout implements IRichSpout { public void activate() { try { _spout.activate(); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } @@ -146,8 +147,8 @@ public class ClojureSpout implements IRichSpout { public void deactivate() { try { _spout.deactivate(); - } catch(AbstractMethodError ame) { - + } catch (AbstractMethodError ame) { + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java index a155008..53136c7 100755 --- a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellBolt.java @@ -26,20 +26,20 @@ import java.util.Map; public class RichShellBolt extends ShellBolt implements IRichBolt { private Map<String, StreamInfo> _outputs; - + public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) { super(command); _outputs = outputs; } - + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _outputs.keySet()) { + for (String stream : _outputs.keySet()) { StreamInfo def = _outputs.get(stream); - if(def.is_direct()) { + if (def.is_direct()) { declarer.declareStream(stream, true, new Fields(def.get_output_fields())); } else { - declarer.declareStream(stream, new Fields(def.get_output_fields())); + declarer.declareStream(stream, new Fields(def.get_output_fields())); } } } @@ -47,5 +47,5 @@ public class RichShellBolt extends ShellBolt implements IRichBolt { @Override public Map<String, Object> getComponentConfiguration() { return null; - } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java index b49fbef..2f7a134 100755 --- a/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/clojure/RichShellSpout.java @@ -34,9 +34,9 @@ public class RichShellSpout extends ShellSpout implements IRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for(String stream: _outputs.keySet()) { + for (String stream : _outputs.keySet()) { StreamInfo def = _outputs.get(stream); - if(def.is_direct()) { + if (def.is_direct()) { declarer.declareStream(stream, true, new Fields(def.get_output_fields())); } else { declarer.declareStream(stream, new Fields(def.get_output_fields())); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/activate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/activate.java b/jstorm-core/src/main/java/backtype/storm/command/activate.java index ed12e09..11a0db5 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/activate.java +++ b/jstorm-core/src/main/java/backtype/storm/command/activate.java @@ -30,7 +30,7 @@ import backtype.storm.utils.Utils; * */ public class activate { - + /** * @param args */ @@ -39,17 +39,17 @@ public class activate { if (args == null || args.length == 0) { throw new InvalidParameterException("Should input topology name"); } - + String topologyName = args[0]; - + NimbusClient client = null; try { - + Map conf = Utils.readStormConfig(); client = NimbusClient.getConfiguredClient(conf); - + client.getClient().activate(topologyName); - + System.out.println("Successfully submit command activate " + topologyName); } catch (Exception e) { System.out.println(e.getMessage()); @@ -61,5 +61,5 @@ public class activate { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/config_value.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/config_value.java b/jstorm-core/src/main/java/backtype/storm/command/config_value.java index 868ffdc..dd8812a 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/config_value.java +++ b/jstorm-core/src/main/java/backtype/storm/command/config_value.java @@ -30,7 +30,7 @@ import backtype.storm.utils.Utils; * */ public class config_value { - + /** * @param args */ @@ -39,12 +39,12 @@ public class config_value { if (args == null || args.length == 0) { throw new InvalidParameterException("Should input key name"); } - + String key = args[0]; - + Map conf = Utils.readStormConfig(); - + System.out.print("VALUE: " + String.valueOf(conf.get(key))); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/deactivate.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/deactivate.java b/jstorm-core/src/main/java/backtype/storm/command/deactivate.java index 22ac20d..59e97d6 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/deactivate.java +++ b/jstorm-core/src/main/java/backtype/storm/command/deactivate.java @@ -30,7 +30,7 @@ import backtype.storm.utils.Utils; * */ public class deactivate { - + /** * @param args */ @@ -39,17 +39,17 @@ public class deactivate { if (args == null || args.length == 0) { throw new InvalidParameterException("Should input topology name"); } - + String topologyName = args[0]; - + NimbusClient client = null; try { - + Map conf = Utils.readStormConfig(); client = NimbusClient.getConfiguredClient(conf); - + client.getClient().deactivate(topologyName); - + System.out.println("Successfully submit command deactivate " + topologyName); } catch (Exception e) { System.out.println(e.getMessage()); @@ -61,5 +61,5 @@ public class deactivate { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java b/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java index 4ab3893..bda20e1 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java +++ b/jstorm-core/src/main/java/backtype/storm/command/kill_topology.java @@ -17,13 +17,13 @@ */ package backtype.storm.command; -import java.security.InvalidParameterException; -import java.util.Map; - import backtype.storm.generated.KillOptions; import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; +import java.security.InvalidParameterException; +import java.util.Map; + /** * Kill topology * @@ -31,7 +31,7 @@ import backtype.storm.utils.Utils; * */ public class kill_topology { - + /** * @param args */ @@ -40,28 +40,28 @@ public class kill_topology { if (args == null || args.length == 0) { throw new InvalidParameterException("Should input topology name"); } - + String topologyName = args[0]; - + NimbusClient client = null; try { - + Map conf = Utils.readStormConfig(); client = NimbusClient.getConfiguredClient(conf); - + if (args.length == 1) { - + client.getClient().killTopology(topologyName); } else { int delaySeconds = Integer.parseInt(args[1]); - + KillOptions options = new KillOptions(); options.set_wait_secs(delaySeconds); - + client.getClient().killTopologyWithOpts(topologyName, options); - + } - + System.out.println("Successfully submit command kill " + topologyName); } catch (Exception e) { System.out.println(e.getMessage()); @@ -73,5 +73,5 @@ public class kill_topology { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/list.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/list.java b/jstorm-core/src/main/java/backtype/storm/command/list.java index 3b4efdb..0c6930d 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/list.java +++ b/jstorm-core/src/main/java/backtype/storm/command/list.java @@ -33,29 +33,29 @@ import backtype.storm.utils.Utils; * */ public class list { - + /** * @param args */ public static void main(String[] args) { - + NimbusClient client = null; try { - + Map conf = Utils.readStormConfig(); client = NimbusClient.getConfiguredClient(conf); - + if (args.length > 0 && StringUtils.isBlank(args[0]) == false) { String topologyName = args[0]; TopologyInfo info = client.getClient().getTopologyInfoByName(topologyName); - + System.out.println("Successfully get topology info \n" + Utils.toPrettyJsonString(info)); } else { ClusterSummary clusterSummary = client.getClient().getClusterInfo(); - + System.out.println("Successfully get cluster info \n" + Utils.toPrettyJsonString(clusterSummary)); } - + } catch (Exception e) { System.out.println(e.getMessage()); e.printStackTrace(); @@ -66,5 +66,5 @@ public class list { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java index 6607445..a673fcf 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java +++ b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java @@ -17,13 +17,13 @@ */ package backtype.storm.command; -import java.util.Map; -import java.security.InvalidParameterException; - import backtype.storm.generated.MonitorOptions; import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; +import java.security.InvalidParameterException; +import java.util.Map; + /** * Monitor topology * @@ -31,7 +31,7 @@ import backtype.storm.utils.Utils; * */ public class metrics_monitor { - + /** * @param args */ @@ -40,22 +40,22 @@ public class metrics_monitor { if (args == null || args.length <= 1) { throw new InvalidParameterException("Should input topology name and enable flag"); } - + String topologyName = args[0]; - + NimbusClient client = null; try { - + Map conf = Utils.readStormConfig(); client = NimbusClient.getConfiguredClient(conf); - + boolean isEnable = Boolean.valueOf(args[1]).booleanValue(); - + MonitorOptions options = new MonitorOptions(); options.set_isEnable(isEnable); - + client.getClient().metricMonitor(topologyName, options); - + String str = (isEnable) ? "enable" : "disable"; System.out.println("Successfully submit command to " + str + " the monitor of " + topologyName); } catch (Exception e) { @@ -68,5 +68,5 @@ public class metrics_monitor { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/rebalance.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java index f0cf69f..6d08934 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java +++ b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java @@ -17,13 +17,12 @@ */ package backtype.storm.command; -import java.security.InvalidParameterException; -import java.util.Map; - import backtype.storm.generated.RebalanceOptions; import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; +import java.util.Map; + /** * Active topology * @@ -32,7 +31,7 @@ import backtype.storm.utils.Utils; */ public class rebalance { static final String REASSIGN_FLAG = "-r"; - + /** * @param args */ @@ -42,15 +41,15 @@ public class rebalance { printErrorInfo(); return; } - + int argsIndex = 0; String topologyName = null; - + try { RebalanceOptions options = new RebalanceOptions(); options.set_reassign(false); options.set_conf(null); - + if (args[argsIndex].equalsIgnoreCase(REASSIGN_FLAG)) { options.set_reassign(true); argsIndex++; @@ -64,7 +63,7 @@ public class rebalance { } else { topologyName = args[argsIndex]; } - + argsIndex++; if (args.length > argsIndex) { for (int i = argsIndex; i < args.length; i++) { @@ -85,32 +84,34 @@ public class rebalance { } } } - + submitRebalance(topologyName, options); - - System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" + options.get_wait_secs() + ", reassignFlag=" + options.is_reassign() + ", newConfiguration=" + options.get_conf()); + + System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" + + options.get_wait_secs() + ", reassignFlag=" + + options.is_reassign() + ", newConfiguration=" + options.get_conf()); } catch (Exception e) { System.out.println(e.getMessage()); e.printStackTrace(); throw new RuntimeException(e); } } - + private static void printErrorInfo() { System.out.println("Error: Invalid parameters!"); System.out.println("USAGE: jstorm rebalance [-r] TopologyName [DelayTime] [NewConfig]"); } - + public static void submitRebalance(String topologyName, RebalanceOptions options) throws Exception { submitRebalance(topologyName, options, null); } - + public static void submitRebalance(String topologyName, RebalanceOptions options, Map conf) throws Exception { Map stormConf = Utils.readStormConfig(); if (conf != null) { stormConf.putAll(conf); } - + NimbusClient client = null; try { client = NimbusClient.getConfiguredClient(stormConf); @@ -123,5 +124,5 @@ public class rebalance { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/restart.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/restart.java b/jstorm-core/src/main/java/backtype/storm/command/restart.java index ecec9a3..5a216ea 100755 --- a/jstorm-core/src/main/java/backtype/storm/command/restart.java +++ b/jstorm-core/src/main/java/backtype/storm/command/restart.java @@ -45,26 +45,26 @@ public class restart { if (args == null || args.length == 0) { throw new InvalidParameterException("Should input topology name"); } - + String topologyName = args[0]; - + NimbusClient client = null; try { Map conf = Utils.readStormConfig(); client = NimbusClient.getConfiguredClient(conf); - + System.out.println("It will take 15 ~ 100 seconds to restart, please wait patiently\n"); - + if (args.length == 1) { client.getClient().restart(topologyName, null); } else { Map loadConf = Utils.loadConf(args[1]); String jsonConf = Utils.to_json(loadConf); System.out.println("New configuration:\n" + jsonConf); - + client.getClient().restart(topologyName, jsonConf); } - + System.out.println("Successfully submit command restart " + topologyName); } catch (Exception e) { System.out.println(e.getMessage()); @@ -76,5 +76,5 @@ public class restart { } } } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/update_config.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_config.java b/jstorm-core/src/main/java/backtype/storm/command/update_config.java deleted file mode 100644 index be78f19..0000000 --- a/jstorm-core/src/main/java/backtype/storm/command/update_config.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.command; - -import java.security.InvalidParameterException; -import java.util.Map; - -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; - -/** - * Update user configuration - * - * @author basti - * - */ -public class update_config { - /** - * @param args - */ - public static void main(String[] args) { - // TODO Auto-generated method stub - if (args == null || args.length < 2) { - throw new InvalidParameterException( - "[USAGE] update_config topologyName config"); - } - - String topologyName = args[0]; - - NimbusClient client = null; - try { - Map conf = Utils.readStormConfig(); - client = NimbusClient.getConfiguredClient(conf); - - Map loadConf = Utils.loadConf(args[1]); - String jsonConf = Utils.to_json(loadConf); - System.out.println("New configuration:\n" + jsonConf); - - client.getClient().updateConf(topologyName, jsonConf); - - System.out.println("Successfully submit command update_conf " - + topologyName); - } catch (Exception e) { - System.out.println(e.getMessage()); - e.printStackTrace(); - throw new RuntimeException(e); - } finally { - if (client != null) { - client.close(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/command/update_topology.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_topology.java b/jstorm-core/src/main/java/backtype/storm/command/update_topology.java new file mode 100644 index 0000000..85172a7 --- /dev/null +++ b/jstorm-core/src/main/java/backtype/storm/command/update_topology.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.command; + +import backtype.storm.GenericOptionsParser; +import backtype.storm.StormSubmitter; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; +import org.apache.commons.cli.*; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Created by xiaojian.fxj on 2015/10/13. + */ +public class update_topology { + public static final String UPDATE_CONF = "-conf"; + + public static final String UPDATE_JAR = "-jar"; + + public static void usage() { + System.out.println("update topology config, please do as following:"); + System.out.println("update_topology topologyName -conf configFile"); + + System.out.println("update topology jar, please do as following:"); + System.out.println("update_topology topologyName -jar jarFile"); + + System.out.println("update topology jar and conf, please do as following:"); + System.out.println("update_topology topologyName -jar jarFile -conf configFile"); + } + + private static Options buildGeneralOptions(Options opts) { + Options r = new Options(); + + for (Object o : opts.getOptions()) + r.addOption((Option) o); + + Option jar = OptionBuilder.withArgName("path").hasArg() + .withDescription("comma jar of the submitted topology") + .create("jar"); + r.addOption(jar); + + Option conf = OptionBuilder.withArgName("configuration file").hasArg() + .withDescription("an application configuration file") + .create("conf"); + r.addOption(conf); + return r; + } + + private static void updateTopology(String topologyName, String pathJar, + String pathConf) { + NimbusClient client = null; + Map loadMap = null; + if (pathConf != null) { + loadMap = Utils.loadConf(pathConf); + } else { + loadMap = new HashMap(); + } + + Map conf = Utils.readStormConfig(); + + conf.putAll(loadMap); + client = NimbusClient.getConfiguredClient(conf); + try { + // update jar + String uploadLocation = null; + if (pathJar != null) { + System.out.println("Jar update to master yet. Submitting jar of " + pathJar); + String path = client.getClient().beginFileUpload(); + String[] pathCache = path.split("/"); + uploadLocation = path + "/stormjar-" + pathCache[pathCache.length - 1] + ".jar"; + List<String> lib = (List<String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_NAME); + Map<String, String> libPath = (Map<String, String>) conf .get(GenericOptionsParser.TOPOLOGY_LIB_PATH); + if (lib != null && lib.size() != 0) { + for (String libName : lib) { + String jarPath = path + "/lib/" + libName; + client.getClient().beginLibUpload(jarPath); + StormSubmitter.submitJar(conf, libPath.get(libName), jarPath, client); + } + + } else { + if (pathJar == null) { + // no lib, no client jar + throw new RuntimeException( "No client app jar, please upload it"); + } + } + + if (pathJar != null) { + StormSubmitter.submitJar(conf, pathJar, uploadLocation, client); + } else { + // no client jar, but with lib jar + client.getClient().finishFileUpload(uploadLocation); + } + } + + // update topology + String jsonConf = Utils.to_json(loadMap); + System.out.println("New configuration:\n" + jsonConf); + + client.getClient().updateTopology(topologyName, uploadLocation, + jsonConf); + + System.out.println("Successfully submit command update " + topologyName); + + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + if (client != null) { + client.close(); + } + } + + } + + /** + * @param args + */ + public static void main(String[] args) { + if (args == null || args.length < 3) { + System.out.println("Invalid parameter"); + usage(); + return; + } + String topologyName = args[0]; + try { + String[] str2 = Arrays.copyOfRange(args, 1, args.length); + CommandLineParser parser = new GnuParser(); + Options r = buildGeneralOptions(new Options()); + CommandLine commandLine = parser.parse(r, str2, true); + + String pathConf = null; + String pathJar = null; + if (commandLine.hasOption("conf")) { + pathConf = (commandLine.getOptionValues("conf"))[0]; + } + if (commandLine.hasOption("jar")) { + pathJar = (commandLine.getOptionValues("jar"))[0]; + } + if (pathConf != null || pathJar != null) + updateTopology(topologyName, pathJar, pathConf); + } catch (Exception e) { + e.printStackTrace(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java index 8653010..d9163e5 100755 --- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java +++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java @@ -32,18 +32,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback { - public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class); + public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class); byte[] _boltSer; Map<Object, IBatchBolt> _openTransactions; Map _conf; TopologyContext _context; BatchOutputCollectorImpl _collector; - + public BatchBoltExecutor(IBatchBolt bolt) { _boltSer = Utils.javaSerialize(bolt); } - + @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _conf = conf; @@ -57,11 +57,11 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa Object id = input.getValue(0); IBatchBolt bolt = getBatchBolt(id); try { - bolt.execute(input); + bolt.execute(input); _collector.ack(input); - } catch(FailedException e) { + } catch (FailedException e) { LOG.error("Failed to process tuple in batch", e); - _collector.fail(input); + _collector.fail(input); } } @@ -78,30 +78,29 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa @Override public void timeoutId(Object attempt) { - _openTransactions.remove(attempt); - } - + _openTransactions.remove(attempt); + } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { newTransactionalBolt().declareOutputFields(declarer); } - + @Override public Map<String, Object> getComponentConfiguration() { return newTransactionalBolt().getComponentConfiguration(); } - + private IBatchBolt getBatchBolt(Object id) { IBatchBolt bolt = _openTransactions.get(id); - if(bolt==null) { + if (bolt == null) { bolt = newTransactionalBolt(); bolt.prepare(_conf, _context, _collector, id); - _openTransactions.put(id, bolt); + _openTransactions.put(id, bolt); } return bolt; } - + private IBatchBolt newTransactionalBolt() { return Utils.javaDeserialize(_boltSer, IBatchBolt.class); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java index f5f3457..0b99339 100755 --- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java +++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java @@ -30,17 +30,16 @@ public abstract class BatchOutputCollector { } public abstract List<Integer> emit(String streamId, List<Object> tuple); - + /** - * Emits a tuple to the specified task on the default output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. + * Emits a tuple to the specified task on the default output stream. This output stream must have been declared as a direct stream, and the specified task + * must use a direct grouping on this stream to receive the message. */ public void emitDirect(int taskId, List<Object> tuple) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple); } - - public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); - + + public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); + public abstract void reportError(Throwable error); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java index cae7560..44a1f01 100755 --- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java +++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java @@ -23,11 +23,11 @@ import java.util.List; public class BatchOutputCollectorImpl extends BatchOutputCollector { OutputCollector _collector; - + public BatchOutputCollectorImpl(OutputCollector collector) { _collector = collector; } - + @Override public List<Integer> emit(String streamId, List<Object> tuple) { return _collector.emit(streamId, tuple); @@ -42,11 +42,11 @@ public class BatchOutputCollectorImpl extends BatchOutputCollector { public void reportError(Throwable error) { _collector.reportError(error); } - + public void ack(Tuple tup) { _collector.ack(tup); } - + public void fail(Tuple tup) { _collector.fail(tup); }
