Bob Tiernay created STORM-812:
---------------------------------

             Summary: Create Testing#createLocalCluster
                 Key: STORM-812
                 URL: https://issues.apache.org/jira/browse/STORM-812
             Project: Apache Storm
          Issue Type: Improvement
            Reporter: Bob Tiernay


Currently, there is a method called {{Testing#withLocalCluster}} that allows 
one to customize the configuration of a {{LocalCluster}} that isn't possible 
using {{new LocalCluster}} (e.g. number supervisors, number of workers per 
supervisor, etc.). However, this method controls the lifecycle of the cluster 
and thus is very inconvenient when that is not desired. This has been discussed 
in 

http://storm-user.narkive.com/FPgchFEL/localcluster-topology-not-working-is-there-a-max-number-of-topologies-to-deploy

The workaround required is less than ideal. 

{code}
  @NonNull
  public static LocalCluster createLocalCluster(String zkHost, long zkPort) {
    val daemonConf = new Config();
    daemonConf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
    daemonConf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of(zkHost));
    daemonConf.put(Config.STORM_ZOOKEEPER_PORT, zkPort);

    val clusterParams = new MkClusterParam();
    clusterParams.setSupervisors(5);
    clusterParams.setPortsPerSupervisor(5);
    clusterParams.setDaemonConf(daemonConf);

    return createLocalCluster(clusterParams);
  }

  /**
   * Hack to override local cluster workers.
   */
  @SneakyThrows
  private static LocalCluster createLocalCluster(final MkClusterParam 
clusterParams) {
    val reference = new AtomicReference<LocalCluster>();
    val latch = new CountDownLatch(1);
    val thread = new Thread(new Runnable() {

      @Override
      public void run() {
        Testing.withLocalCluster(clusterParams,
            new TestJob() {

              @Override
              @SneakyThrows
              public void run(final ILocalCluster cluster) {
                reference.set((LocalCluster) cluster);
                latch.countDown();

                // Wait forever
                synchronized (this) {
                  while (true) {
                    this.wait();
                  }
                }
              }

            });

      }

    });

    thread.setDaemon(true);
    thread.start();

    latch.await();
    return reference.get();
  }

}
{code}

For greater flexibility, a {{createLocalCluster}} should be created.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to