Bob Tiernay created STORM-812: --------------------------------- Summary: Create Testing#createLocalCluster Key: STORM-812 URL: https://issues.apache.org/jira/browse/STORM-812 Project: Apache Storm Issue Type: Improvement Reporter: Bob Tiernay
Currently, there is a method called {{Testing#withLocalCluster}} that allows one to customize the configuration of a {{LocalCluster}} that isn't possible using {{new LocalCluster}} (e.g. number supervisors, number of workers per supervisor, etc.). However, this method controls the lifecycle of the cluster and thus is very inconvenient when that is not desired. This has been discussed in http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy The workaround required is less than ideal. {code} @NonNull public static LocalCluster createLocalCluster(String zkHost, long zkPort) { val daemonConf = new Config(); daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0); daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost)); daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort); val clusterParams = new MkClusterParam(); clusterParams.setSupervisors(5); clusterParams.setPortsPerSupervisor(5); clusterParams.setDaemonConf(daemonConf); return createLocalCluster(clusterParams); } /** * Hack to override local cluster workers. */ @SneakyThrows private static LocalCluster createLocalCluster(final MkClusterParam clusterParams) { val reference = new AtomicReference<LocalCluster>(); val latch = new CountDownLatch(1); val thread = new Thread(new Runnable() { @Override public void run() { Testing.withLocalCluster(clusterParams, new TestJob() { @Override @SneakyThrows public void run(final ILocalCluster cluster) { reference.set((LocalCluster) cluster); latch.countDown(); // Wait forever synchronized (this) { while (true) { this.wait(); } } } }); } }); thread.setDaemon(true); thread.start(); latch.await(); return reference.get(); } } {code} For greater flexibility, a {{createLocalCluster}} should be created. -- This message was sent by Atlassian JIRA (v6.3.4#6332)