[jira] [Created] (STORM-812) Create Testing#createLocalCluster

2015-05-10 Thread Bob Tiernay (JIRA)
Bob Tiernay created STORM-812:
-

 Summary: Create Testing#createLocalCluster
 Key: STORM-812
 URL: https://issues.apache.org/jira/browse/STORM-812
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Bob Tiernay


Currently, there is a method called {{Testing#withLocalCluster}} that allows 
one to customize the configuration of a {{LocalCluster}} that isn't possible 
using {{new LocalCluster}} (e.g. number supervisors, number of workers per 
supervisor, etc.). However, this method controls the lifecycle of the cluster 
and thus is very inconvenient when that is not desired. This has been discussed 
in 

http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy

The workaround required is less than ideal. 

{code}
  @NonNull
  public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
val daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);

val clusterParams = new MkClusterParam();
clusterParams.setSupervisors(5);
clusterParams.setPortsPerSupervisor(5);
clusterParams.setDaemonConf(daemonConf);

return createLocalCluster(clusterParams);
  }

  /**
   * Hack to override local cluster workers.
   */
  @SneakyThrows
  private static LocalCluster createLocalCluster(final MkClusterParam 
clusterParams) {
val reference = new AtomicReferenceLocalCluster();
val latch = new CountDownLatch(1);
val thread = new Thread(new Runnable() {

  @Override
  public void run() {
Testing.withLocalCluster(clusterParams,
new TestJob() {

  @Override
  @SneakyThrows
  public void run(final ILocalCluster cluster) {
reference.set((LocalCluster) cluster);
latch.countDown();

// Wait forever
synchronized (this) {
  while (true) {
this.wait();
  }
}
  }

});

  }

});

thread.setDaemon(true);
thread.start();

latch.await();
return reference.get();
  }

}
{code}

For greater flexibility, a {{createLocalCluster}} should be created.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-812) Create Testing#createLocalCluster

2015-05-10 Thread Bob Tiernay (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Tiernay updated STORM-812:
--
Description: 
Currently, there is a method called {{Testing#withLocalCluster}} that allows 
one to customize the configuration of a {{LocalCluster}} that isn't possible 
using {{new LocalCluster}}. In fact, as far as I can tell this is the only way 
to specify the number supervisors and number of workers per supervisor. 
However, this method controls the life-cycle of the cluster and thus is very 
inconvenient when that is not desired. This has been discussed in 

http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy

The workaround required is less than ideal:

{code}
  @NonNull
  public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
val daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);

val clusterParams = new MkClusterParam();
clusterParams.setSupervisors(5);
clusterParams.setPortsPerSupervisor(5);
clusterParams.setDaemonConf(daemonConf);

return createLocalCluster(clusterParams);
  }

  /**
   * Hack to override local cluster workers.
   */
  @SneakyThrows
  private static LocalCluster createLocalCluster(final MkClusterParam 
clusterParams) {
val reference = new AtomicReferenceLocalCluster();
val latch = new CountDownLatch(1);
val thread = new Thread(new Runnable() {

  @Override
  public void run() {
Testing.withLocalCluster(clusterParams,
new TestJob() {

  @Override
  @SneakyThrows
  public void run(final ILocalCluster cluster) {
reference.set((LocalCluster) cluster);
latch.countDown();

// Wait forever
synchronized (this) {
  while (true) {
this.wait();
  }
}
  }

});

  }

});

thread.setDaemon(true);
thread.start();

latch.await();
return reference.get();
  }

}
{code}

For greater flexibility, a {{createLocalCluster}} should be created.

  was:
Currently, there is a method called {{Testing#withLocalCluster}} that allows 
one to customize the configuration of a {{LocalCluster}} that isn't possible 
using {{new LocalCluster}} (e.g. number supervisors, number of workers per 
supervisor, etc.). However, this method controls the lifecycle of the cluster 
and thus is very inconvenient when that is not desired. This has been discussed 
in 

http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy

The workaround required is less than ideal. 

{code}
  @NonNull
  public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
val daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);

val clusterParams = new MkClusterParam();
clusterParams.setSupervisors(5);
clusterParams.setPortsPerSupervisor(5);
clusterParams.setDaemonConf(daemonConf);

return createLocalCluster(clusterParams);
  }

  /**
   * Hack to override local cluster workers.
   */
  @SneakyThrows
  private static LocalCluster createLocalCluster(final MkClusterParam 
clusterParams) {
val reference = new AtomicReferenceLocalCluster();
val latch = new CountDownLatch(1);
val thread = new Thread(new Runnable() {

  @Override
  public void run() {
Testing.withLocalCluster(clusterParams,
new TestJob() {

  @Override
  @SneakyThrows
  public void run(final ILocalCluster cluster) {
reference.set((LocalCluster) cluster);
latch.countDown();

// Wait forever
synchronized (this) {
  while (true) {
this.wait();
  }
}
  }

});

  }

});

thread.setDaemon(true);
thread.start();

latch.await();
return reference.get();
  }

}
{code}

For greater flexibility, a {{createLocalCluster}} should be created.


 Create Testing#createLocalCluster
 -

 Key: STORM-812
 URL: https://issues.apache.org/jira/browse/STORM-812
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Bob Tiernay

 Currently, there is a method called {{Testing#withLocalCluster}} that allows 
 one to customize the configuration of a {{LocalCluster}} that isn't possible 
 using 

[jira] [Commented] (STORM-812) Create Testing#createLocalCluster

2015-05-10 Thread Bob Tiernay (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537397#comment-14537397
 ] 

Bob Tiernay commented on STORM-812:
---

The other option is to add a new constructor to {{LocalCluster}} that accepts a 
{{MkClusterParam}}.

 Create Testing#createLocalCluster
 -

 Key: STORM-812
 URL: https://issues.apache.org/jira/browse/STORM-812
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Bob Tiernay

 Currently, there is a method called {{Testing#withLocalCluster}} that allows 
 one to customize the configuration of a {{LocalCluster}} that isn't possible 
 using {{new LocalCluster}}. In fact, as far as I can tell this is the only 
 way to specify the number supervisors and number of workers per supervisor. 
 However, this method controls the life-cycle of the cluster and thus is very 
 inconvenient when that is not desired. This has been discussed in 
 http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy
 The workaround required is less than ideal:
 {code}
   @NonNull
   public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
 val daemonConf = new Config();
 daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
 daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
 daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
 val clusterParams = new MkClusterParam();
 clusterParams.setSupervisors(5);
 clusterParams.setPortsPerSupervisor(5);
 clusterParams.setDaemonConf(daemonConf);
 return createLocalCluster(clusterParams);
   }
   /**
* Hack to override local cluster workers.
*/
   @SneakyThrows
   private static LocalCluster createLocalCluster(final MkClusterParam 
 clusterParams) {
 val reference = new AtomicReferenceLocalCluster();
 val latch = new CountDownLatch(1);
 val thread = new Thread(new Runnable() {
   @Override
   public void run() {
 Testing.withLocalCluster(clusterParams,
 new TestJob() {
   @Override
   @SneakyThrows
   public void run(final ILocalCluster cluster) {
 reference.set((LocalCluster) cluster);
 latch.countDown();
 // Wait forever
 synchronized (this) {
   while (true) {
 this.wait();
   }
 }
   }
 });
   }
 });
 thread.setDaemon(true);
 thread.start();
 latch.await();
 return reference.get();
   }
 }
 {code}
 For greater flexibility, a {{createLocalCluster}} should be created.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (STORM-811) Hive test fail after first run

2015-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537509#comment-14537509
 ] 

ASF GitHub Bot commented on STORM-811:
--

Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/547#issuecomment-100752288
  
@revans2  I am +1 on the patch. Good to remove metastore_db before running 
tests.


 Hive test fail after first run
 --

 Key: STORM-811
 URL: https://issues.apache.org/jira/browse/STORM-811
 Project: Apache Storm
  Issue Type: Bug
Reporter: Robert Joseph Evans
Assignee: Robert Joseph Evans

 I don't know why but on my Mac, not on Linux, or on other peoples Mac's. 
 leaving the derby metastor db on the file system results in the tests failing 
 the second time with an array index out of bounds error as captured by 
 derby.log.
 I can work around this by cleaning out the DB state before running the tests. 
  I really don't know why, but it is 100% reproducible, but just for me :).  
 So updating the build to clean it up before the tests run.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-811: remove old metastor_db before runni...

2015-05-10 Thread harshach
Github user harshach commented on the pull request:

https://github.com/apache/storm/pull/547#issuecomment-100752288
  
@revans2  I am +1 on the patch. Good to remove metastore_db before running 
tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-561) Add ability to create topologies dynamically

2015-05-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14537136#comment-14537136
 ] 

ASF GitHub Bot commented on STORM-561:
--

Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/546#issuecomment-100625034
  
Good job! We also developed a componet like FLUX. I'm happy to try.


 Add ability to create topologies dynamically
 

 Key: STORM-561
 URL: https://issues.apache.org/jira/browse/STORM-561
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Nathan Leung
Assignee: P. Taylor Goetz
   Original Estimate: 336h
  Remaining Estimate: 336h

 It would be nice if a storm topology could be built dynamically, instead of 
 requiring a recompile to change parameters (e.g. number of workers, number of 
 tasks, layout, etc).
 I would propose the following data structures for building core storm 
 topologies.  I haven't done a design for trident yet but the intention would 
 be to add trident support when core storm support is complete (or in parallel 
 if there are other people working on it):
 {code}
 // fields value and arguments are mutually exclusive
 class Argument {
 String argumentType;  // Class used to lookup arguments in 
 method/constructor
 String implementationType; // Class used to create this argument
 String value; // String used to construct this argument
 ListArgument arguments; // arguments used to build this argument
 }
 class Dependency {
 String upstreamComponent; // name of upstream component
 String grouping;
 ListArgument arguments; // arguments for the grouping
 }
 class StormSpout {
 String name;
 String klazz;  // Class of this spout
 List Argument arguments;
 int numTasks;
 int numExecutors;
 }
 class StormBolt {
 String name;
 String klazz; // Class of this bolt
 List Argument arguments;
 int numTasks;
 int numExecutors;
 ListDependency dependencies;
 }
 class StormTopologyRepresentation {
 String name;
 ListStormSpout spouts;
 ListStormBolt bolts;
 Map config;
 int numWorkers;
 }
 {code}
 Topology creation will be built on top of the data structures above.  The 
 benefits:
 * Dependency free.  Code to unmarshal from json, xml, etc, can be kept in 
 extensions, or as examples, and users can write a different unmarshaller if 
 they want to use a different text representation.
 * support arbitrary spout and bolts types
 * support of all groupings, streams, via reflections
 * ability to specify configuration map via config file
 * reification of spout / bolt / dependency arguments
 ** recursive argument reification for complex objects



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request: STORM-561: Add flux as an external module

2015-05-10 Thread vesense
Github user vesense commented on the pull request:

https://github.com/apache/storm/pull/546#issuecomment-100625034
  
Good job! We also developed a componet like FLUX. I'm happy to try.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---