[jira] [Updated] (STORM-812) Create Testing#createLocalCluster

2015-09-28 Thread Rick Kellogg (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Kellogg updated STORM-812:
---
Component/s: storm-core

> Create Testing#createLocalCluster
> -
>
> Key: STORM-812
> URL: https://issues.apache.org/jira/browse/STORM-812
> Project: Apache Storm
>  Issue Type: Improvement
>  Components: storm-core
>Reporter: Bob Tiernay
>
> Currently, there is a method called {{Testing#withLocalCluster}} that allows 
> one to customize the configuration of a {{LocalCluster}} that isn't possible 
> using {{new LocalCluster}}. In fact, as far as I can tell this is the only 
> way to specify the number supervisors and number of workers per supervisor. 
> However, this method controls the life-cycle of the cluster and thus is very 
> inconvenient when that is not desired. This has been discussed in 
> http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy
> The workaround required is less than ideal:
> {code}
>   @NonNull
>   public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
> val daemonConf = new Config();
> daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
> daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
> daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
> val clusterParams = new MkClusterParam();
> clusterParams.setSupervisors(5);
> clusterParams.setPortsPerSupervisor(5);
> clusterParams.setDaemonConf(daemonConf);
> return createLocalCluster(clusterParams);
>   }
>   /**
>* Hack to override local cluster workers.
>*/
>   @SneakyThrows
>   private static LocalCluster createLocalCluster(final MkClusterParam 
> clusterParams) {
> val reference = new AtomicReference();
> val latch = new CountDownLatch(1);
> val thread = new Thread(new Runnable() {
>   @Override
>   public void run() {
> Testing.withLocalCluster(clusterParams,
> new TestJob() {
>   @Override
>   @SneakyThrows
>   public void run(final ILocalCluster cluster) {
> reference.set((LocalCluster) cluster);
> latch.countDown();
> // Wait forever
> synchronized (this) {
>   while (true) {
> this.wait();
>   }
> }
>   }
> });
>   }
> });
> thread.setDaemon(true);
> thread.start();
> latch.await();
> return reference.get();
>   }
> }
> {code}
> For greater flexibility, a {{createLocalCluster}} should be created.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (STORM-812) Create Testing#createLocalCluster

2015-05-10 Thread Bob Tiernay (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Tiernay updated STORM-812:
--
Description: 
Currently, there is a method called {{Testing#withLocalCluster}} that allows 
one to customize the configuration of a {{LocalCluster}} that isn't possible 
using {{new LocalCluster}}. In fact, as far as I can tell this is the only way 
to specify the number supervisors and number of workers per supervisor. 
However, this method controls the life-cycle of the cluster and thus is very 
inconvenient when that is not desired. This has been discussed in 

http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy

The workaround required is less than ideal:

{code}
  @NonNull
  public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
val daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);

val clusterParams = new MkClusterParam();
clusterParams.setSupervisors(5);
clusterParams.setPortsPerSupervisor(5);
clusterParams.setDaemonConf(daemonConf);

return createLocalCluster(clusterParams);
  }

  /**
   * Hack to override local cluster workers.
   */
  @SneakyThrows
  private static LocalCluster createLocalCluster(final MkClusterParam 
clusterParams) {
val reference = new AtomicReferenceLocalCluster();
val latch = new CountDownLatch(1);
val thread = new Thread(new Runnable() {

  @Override
  public void run() {
Testing.withLocalCluster(clusterParams,
new TestJob() {

  @Override
  @SneakyThrows
  public void run(final ILocalCluster cluster) {
reference.set((LocalCluster) cluster);
latch.countDown();

// Wait forever
synchronized (this) {
  while (true) {
this.wait();
  }
}
  }

});

  }

});

thread.setDaemon(true);
thread.start();

latch.await();
return reference.get();
  }

}
{code}

For greater flexibility, a {{createLocalCluster}} should be created.

  was:
Currently, there is a method called {{Testing#withLocalCluster}} that allows 
one to customize the configuration of a {{LocalCluster}} that isn't possible 
using {{new LocalCluster}} (e.g. number supervisors, number of workers per 
supervisor, etc.). However, this method controls the lifecycle of the cluster 
and thus is very inconvenient when that is not desired. This has been discussed 
in 

http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy

The workaround required is less than ideal. 

{code}
  @NonNull
  public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
val daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);

val clusterParams = new MkClusterParam();
clusterParams.setSupervisors(5);
clusterParams.setPortsPerSupervisor(5);
clusterParams.setDaemonConf(daemonConf);

return createLocalCluster(clusterParams);
  }

  /**
   * Hack to override local cluster workers.
   */
  @SneakyThrows
  private static LocalCluster createLocalCluster(final MkClusterParam 
clusterParams) {
val reference = new AtomicReferenceLocalCluster();
val latch = new CountDownLatch(1);
val thread = new Thread(new Runnable() {

  @Override
  public void run() {
Testing.withLocalCluster(clusterParams,
new TestJob() {

  @Override
  @SneakyThrows
  public void run(final ILocalCluster cluster) {
reference.set((LocalCluster) cluster);
latch.countDown();

// Wait forever
synchronized (this) {
  while (true) {
this.wait();
  }
}
  }

});

  }

});

thread.setDaemon(true);
thread.start();

latch.await();
return reference.get();
  }

}
{code}

For greater flexibility, a {{createLocalCluster}} should be created.


 Create Testing#createLocalCluster
 -

 Key: STORM-812
 URL: https://issues.apache.org/jira/browse/STORM-812
 Project: Apache Storm
  Issue Type: Improvement
Reporter: Bob Tiernay

 Currently, there is a method called {{Testing#withLocalCluster}} that allows 
 one to customize the configuration of a {{LocalCluster}} that isn't possible 
 using