[
https://issues.apache.org/jira/browse/STORM-812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bob Tiernay updated STORM-812:
--
Description:
Currently, there is a method called {{Testing#withLocalCluster}} that allows
one to customize the configuration of a {{LocalCluster}} that isn't possible
using {{new LocalCluster}}. In fact, as far as I can tell this is the only way
to specify the number supervisors and number of workers per supervisor.
However, this method controls the life-cycle of the cluster and thus is very
inconvenient when that is not desired. This has been discussed in
http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy
The workaround required is less than ideal:
{code}
@NonNull
public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
val daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
val clusterParams = new MkClusterParam();
clusterParams.setSupervisors(5);
clusterParams.setPortsPerSupervisor(5);
clusterParams.setDaemonConf(daemonConf);
return createLocalCluster(clusterParams);
}
/**
* Hack to override local cluster workers.
*/
@SneakyThrows
private static LocalCluster createLocalCluster(final MkClusterParam
clusterParams) {
val reference = new AtomicReferenceLocalCluster();
val latch = new CountDownLatch(1);
val thread = new Thread(new Runnable() {
@Override
public void run() {
Testing.withLocalCluster(clusterParams,
new TestJob() {
@Override
@SneakyThrows
public void run(final ILocalCluster cluster) {
reference.set((LocalCluster) cluster);
latch.countDown();
// Wait forever
synchronized (this) {
while (true) {
this.wait();
}
}
}
});
}
});
thread.setDaemon(true);
thread.start();
latch.await();
return reference.get();
}
}
{code}
For greater flexibility, a {{createLocalCluster}} should be created.
was:
Currently, there is a method called {{Testing#withLocalCluster}} that allows
one to customize the configuration of a {{LocalCluster}} that isn't possible
using {{new LocalCluster}} (e.g. number supervisors, number of workers per
supervisor, etc.). However, this method controls the lifecycle of the cluster
and thus is very inconvenient when that is not desired. This has been discussed
in
http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy
The workaround required is less than ideal.
{code}
@NonNull
public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
val daemonConf = new Config();
daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);
val clusterParams = new MkClusterParam();
clusterParams.setSupervisors(5);
clusterParams.setPortsPerSupervisor(5);
clusterParams.setDaemonConf(daemonConf);
return createLocalCluster(clusterParams);
}
/**
* Hack to override local cluster workers.
*/
@SneakyThrows
private static LocalCluster createLocalCluster(final MkClusterParam
clusterParams) {
val reference = new AtomicReferenceLocalCluster();
val latch = new CountDownLatch(1);
val thread = new Thread(new Runnable() {
@Override
public void run() {
Testing.withLocalCluster(clusterParams,
new TestJob() {
@Override
@SneakyThrows
public void run(final ILocalCluster cluster) {
reference.set((LocalCluster) cluster);
latch.countDown();
// Wait forever
synchronized (this) {
while (true) {
this.wait();
}
}
}
});
}
});
thread.setDaemon(true);
thread.start();
latch.await();
return reference.get();
}
}
{code}
For greater flexibility, a {{createLocalCluster}} should be created.
Create Testing#createLocalCluster
-
Key: STORM-812
URL: https://issues.apache.org/jira/browse/STORM-812
Project: Apache Storm
Issue Type: Improvement
Reporter: Bob Tiernay
Currently, there is a method called {{Testing#withLocalCluster}} that allows
one to customize the configuration of a {{LocalCluster}} that isn't possible
using