[ 
https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14537397#comment-14537397
 ] 

Bob Tiernay commented on STORM-812:
-----------------------------------

The other option is to add a new constructor to {{LocalCluster}} that accepts a 
{{MkClusterParam}}.

> Create Testing#createLocalCluster
> ---------------------------------
>
>                 Key: STORM-812
>                 URL: https://issues.apache.org/jira/browse/STORM-812
>             Project: Apache Storm
>          Issue Type: Improvement
>            Reporter: Bob Tiernay
>
> Currently, there is a method called {{Testing#withLocalCluster}} that allows 
> one to customize the configuration of a {{LocalCluster}} that isn't possible 
> using {{new LocalCluster}}. In fact, as far as I can tell this is the only 
> way to specify the number supervisors and number of workers per supervisor. 
> However, this method controls the life-cycle of the cluster and thus is very 
> inconvenient when that is not desired. This has been discussed in 
> http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy
> The workaround required is less than ideal:
> {code}
>   @NonNull
>   public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
>     val daemonConf = new Config();
>     daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
>     daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
>     daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
>     val clusterParams = new MkClusterParam();
>     clusterParams.setSupervisors(5);
>     clusterParams.setPortsPerSupervisor(5);
>     clusterParams.setDaemonConf(daemonConf);
>     return createLocalCluster(clusterParams);
>   }
>   /**
>    * Hack to override local cluster workers.
>    */
>   @SneakyThrows
>   private static LocalCluster createLocalCluster(final MkClusterParam 
> clusterParams) {
>     val reference = new AtomicReference<LocalCluster>();
>     val latch = new CountDownLatch(1);
>     val thread = new Thread(new Runnable() {
>       @Override
>       public void run() {
>         Testing.withLocalCluster(clusterParams,
>             new TestJob() {
>               @Override
>               @SneakyThrows
>               public void run(final ILocalCluster cluster) {
>                 reference.set((LocalCluster) cluster);
>                 latch.countDown();
>                 // Wait forever
>                 synchronized (this) {
>                   while (true) {
>                     this.wait();
>                   }
>                 }
>               }
>             });
>       }
>     });
>     thread.setDaemon(true);
>     thread.start();
>     latch.await();
>     return reference.get();
>   }
> }
> {code}
> For greater flexibility, a {{createLocalCluster}} should be created.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to