[jira] [Created] (STORM-812) Create Testing#createLocalCluster
Bob Tiernay created STORM-812: - Summary: Create Testing#createLocalCluster Key: STORM-812 URL: https://issues.apache.org/jira/browse/STORM-812 Project: Apache Storm Issue Type: Improvement Reporter: Bob Tiernay Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using {{new LocalCluster}} (e.g. number supervisors, number of workers per supervisor, etc.). However, this method controls the lifecycle of the cluster and thus is very inconvenient when that is not desired. This has been discussed in http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy The workaround required is less than ideal. {code} @NonNull public static LocalCluster createLocalCluster(String zkHost, long zkPort) { val daemonConf = new Config(); daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost)); daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort); val clusterParams = new MkClusterParam(); clusterParams.setSupervisors(5); clusterParams.setPortsPerSupervisor(5); clusterParams.setDaemonConf(daemonConf); return createLocalCluster(clusterParams); } /** * Hack to override local cluster workers. */ @SneakyThrows private static LocalCluster createLocalCluster(final MkClusterParam clusterParams) { val reference = new AtomicReferenceLocalCluster(); val latch = new CountDownLatch(1); val thread = new Thread(new Runnable() { @Override public void run() { Testing.withLocalCluster(clusterParams, new TestJob() { @Override @SneakyThrows public void run(final ILocalCluster cluster) { reference.set((LocalCluster) cluster); latch.countDown(); // Wait forever synchronized (this) { while (true) { this.wait(); } } } }); } }); thread.setDaemon(true); thread.start(); latch.await(); return reference.get(); } } {code} For greater flexibility, a {{createLocalCluster}} should be created. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (STORM-812) Create Testing#createLocalCluster
[ https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Tiernay updated STORM-812: -- Description: Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using {{new LocalCluster}}. In fact, as far as I can tell this is the only way to specify the number supervisors and number of workers per supervisor. However, this method controls the life-cycle of the cluster and thus is very inconvenient when that is not desired. This has been discussed in http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy The workaround required is less than ideal: {code} @NonNull public static LocalCluster createLocalCluster(String zkHost, long zkPort) { val daemonConf = new Config(); daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost)); daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort); val clusterParams = new MkClusterParam(); clusterParams.setSupervisors(5); clusterParams.setPortsPerSupervisor(5); clusterParams.setDaemonConf(daemonConf); return createLocalCluster(clusterParams); } /** * Hack to override local cluster workers. */ @SneakyThrows private static LocalCluster createLocalCluster(final MkClusterParam clusterParams) { val reference = new AtomicReferenceLocalCluster(); val latch = new CountDownLatch(1); val thread = new Thread(new Runnable() { @Override public void run() { Testing.withLocalCluster(clusterParams, new TestJob() { @Override @SneakyThrows public void run(final ILocalCluster cluster) { reference.set((LocalCluster) cluster); latch.countDown(); // Wait forever synchronized (this) { while (true) { this.wait(); } } } }); } }); thread.setDaemon(true); thread.start(); latch.await(); return reference.get(); } } {code} For greater flexibility, a {{createLocalCluster}} should be created. was: Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using {{new LocalCluster}} (e.g. number supervisors, number of workers per supervisor, etc.). However, this method controls the lifecycle of the cluster and thus is very inconvenient when that is not desired. This has been discussed in http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy The workaround required is less than ideal. {code} @NonNull public static LocalCluster createLocalCluster(String zkHost, long zkPort) { val daemonConf = new Config(); daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost)); daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort); val clusterParams = new MkClusterParam(); clusterParams.setSupervisors(5); clusterParams.setPortsPerSupervisor(5); clusterParams.setDaemonConf(daemonConf); return createLocalCluster(clusterParams); } /** * Hack to override local cluster workers. */ @SneakyThrows private static LocalCluster createLocalCluster(final MkClusterParam clusterParams) { val reference = new AtomicReferenceLocalCluster(); val latch = new CountDownLatch(1); val thread = new Thread(new Runnable() { @Override public void run() { Testing.withLocalCluster(clusterParams, new TestJob() { @Override @SneakyThrows public void run(final ILocalCluster cluster) { reference.set((LocalCluster) cluster); latch.countDown(); // Wait forever synchronized (this) { while (true) { this.wait(); } } } }); } }); thread.setDaemon(true); thread.start(); latch.await(); return reference.get(); } } {code} For greater flexibility, a {{createLocalCluster}} should be created. Create Testing#createLocalCluster - Key: STORM-812 URL: https://issues.apache.org/jira/browse/STORM-812 Project: Apache Storm Issue Type: Improvement Reporter: Bob Tiernay Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using
[jira] [Commented] (STORM-812) Create Testing#createLocalCluster
[ https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537397#comment-14537397 ] Bob Tiernay commented on STORM-812: --- The other option is to add a new constructor to {{LocalCluster}} that accepts a {{MkClusterParam}}. Create Testing#createLocalCluster - Key: STORM-812 URL: https://issues.apache.org/jira/browse/STORM-812 Project: Apache Storm Issue Type: Improvement Reporter: Bob Tiernay Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using {{new LocalCluster}}. In fact, as far as I can tell this is the only way to specify the number supervisors and number of workers per supervisor. However, this method controls the life-cycle of the cluster and thus is very inconvenient when that is not desired. This has been discussed in http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy The workaround required is less than ideal: {code} @NonNull public static LocalCluster createLocalCluster(String zkHost, long zkPort) { val daemonConf = new Config(); daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost)); daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort); val clusterParams = new MkClusterParam(); clusterParams.setSupervisors(5); clusterParams.setPortsPerSupervisor(5); clusterParams.setDaemonConf(daemonConf); return createLocalCluster(clusterParams); } /** * Hack to override local cluster workers. */ @SneakyThrows private static LocalCluster createLocalCluster(final MkClusterParam clusterParams) { val reference = new AtomicReferenceLocalCluster(); val latch = new CountDownLatch(1); val thread = new Thread(new Runnable() { @Override public void run() { Testing.withLocalCluster(clusterParams, new TestJob() { @Override @SneakyThrows public void run(final ILocalCluster cluster) { reference.set((LocalCluster) cluster); latch.countDown(); // Wait forever synchronized (this) { while (true) { this.wait(); } } } }); } }); thread.setDaemon(true); thread.start(); latch.await(); return reference.get(); } } {code} For greater flexibility, a {{createLocalCluster}} should be created. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-811) Hive test fail after first run
[ https://issues.apache.org/jira/browse/STORM-811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537509#comment-14537509 ] ASF GitHub Bot commented on STORM-811: -- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/547#issuecomment-100752288 @revans2 I am +1 on the patch. Good to remove metastore_db before running tests. Hive test fail after first run -- Key: STORM-811 URL: https://issues.apache.org/jira/browse/STORM-811 Project: Apache Storm Issue Type: Bug Reporter: Robert Joseph Evans Assignee: Robert Joseph Evans I don't know why but on my Mac, not on Linux, or on other peoples Mac's. leaving the derby metastor db on the file system results in the tests failing the second time with an array index out of bounds error as captured by derby.log. I can work around this by cleaning out the DB state before running the tests. I really don't know why, but it is 100% reproducible, but just for me :). So updating the build to clean it up before the tests run. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-811: remove old metastor_db before runni...
Github user harshach commented on the pull request: https://github.com/apache/storm/pull/547#issuecomment-100752288 @revans2 I am +1 on the patch. Good to remove metastore_db before running tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (STORM-561) Add ability to create topologies dynamically
[ https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537136#comment-14537136 ] ASF GitHub Bot commented on STORM-561: -- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/546#issuecomment-100625034 Good job! We also developed a componet like FLUX. I'm happy to try. Add ability to create topologies dynamically Key: STORM-561 URL: https://issues.apache.org/jira/browse/STORM-561 Project: Apache Storm Issue Type: Improvement Reporter: Nathan Leung Assignee: P. Taylor Goetz Original Estimate: 336h Remaining Estimate: 336h It would be nice if a storm topology could be built dynamically, instead of requiring a recompile to change parameters (e.g. number of workers, number of tasks, layout, etc). I would propose the following data structures for building core storm topologies. I haven't done a design for trident yet but the intention would be to add trident support when core storm support is complete (or in parallel if there are other people working on it): {code} // fields value and arguments are mutually exclusive class Argument { String argumentType; // Class used to lookup arguments in method/constructor String implementationType; // Class used to create this argument String value; // String used to construct this argument ListArgument arguments; // arguments used to build this argument } class Dependency { String upstreamComponent; // name of upstream component String grouping; ListArgument arguments; // arguments for the grouping } class StormSpout { String name; String klazz; // Class of this spout List Argument arguments; int numTasks; int numExecutors; } class StormBolt { String name; String klazz; // Class of this bolt List Argument arguments; int numTasks; int numExecutors; ListDependency dependencies; } class StormTopologyRepresentation { String name; ListStormSpout spouts; ListStormBolt bolts; Map config; int numWorkers; } {code} Topology creation will be built on top of the data structures above. The benefits: * Dependency free. Code to unmarshal from json, xml, etc, can be kept in extensions, or as examples, and users can write a different unmarshaller if they want to use a different text representation. * support arbitrary spout and bolts types * support of all groupings, streams, via reflections * ability to specify configuration map via config file * reification of spout / bolt / dependency arguments ** recursive argument reification for complex objects -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] storm pull request: STORM-561: Add flux as an external module
Github user vesense commented on the pull request: https://github.com/apache/storm/pull/546#issuecomment-100625034 Good job! We also developed a componet like FLUX. I'm happy to try. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---