Repository: incubator-eagle Updated Branches: refs/heads/master cb3bf7b60 -> f21145635
EAGLE-675 : AlertEngine: don't host long-live curator framework for schedule Author: ralphsu This closes #626 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f2114563 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f2114563 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f2114563 Branch: refs/heads/master Commit: f21145635a609dc25477f0384cbb14fb17fbd0be Parents: cb3bf7b Author: Ralph, Su <suliang...@gmail.com> Authored: Tue Nov 8 20:31:57 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Tue Nov 8 21:12:34 2016 +0800 ---------------------------------------------------------------------- .../eagle/alert/coordinator/Coordinator.java | 179 +++++-------------- .../alert/coordinator/ExclusiveExecutor.java | 112 ++++++------ .../coordinator/trigger/CoordinatorTrigger.java | 53 +++--- .../alert/coordinator/CoordinatorTest.java | 13 +- .../coordinator/TestExclusiveExecutor.java | 73 ++++---- .../TestGreedyScheduleCoordinator.java | 71 ++++---- .../src/test/resources/test-application.conf | 2 +- 7 files changed, 205 insertions(+), 298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java index deeeec9..5ed6700 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java @@ -27,32 +27,22 @@ import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordinator.impl.MetadataValdiator; import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger; -import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader; -import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener; -import org.apache.eagle.alert.engine.coordinator.PolicyDefinition; import org.apache.eagle.alert.service.IMetadataServiceClient; import org.apache.eagle.alert.service.MetadataServiceClientImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.text.MessageFormat; -import java.util.Collection; -import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** - * TODO: To simply avoid concurrent call of schdule, make the schedule as synchronized. This is not safe when multiple - * instance, consider a distributed lock for prevent multiple schedule happen concurrently. - * - * <p>Coordinator is a standalone java application, which listens to policy changes and use schedule - * algorithm to distribute policies 1) reacting to shutdown events 2) start non-daemon thread to pull policies - * and figure out if polices are changed</p> - * - * @since Mar 24, 2016 + * @since Mar 24, 2016. */ public class Coordinator { @@ -79,13 +69,10 @@ public class Coordinator { private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis"; private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis"; - private static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader"; - private static final String POLICY_SCHEDULER_ZK_PATH = "/alert/policy/leader"; - private static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 2000; - private static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 90; //about 9 minutes + public static final String GREEDY_SCHEDULER_ZK_PATH = "/alert/greedy/leader"; private volatile ScheduleState currentState = null; - private final ConfigBusProducer producer; + private ZKConfig zkConfig = null; private final IMetadataServiceClient client; private Config config; @@ -94,44 +81,22 @@ public class Coordinator { public Coordinator() { config = ConfigFactory.load().getConfig(COORDINATOR); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - producer = new ConfigBusProducer(zkConfig); + zkConfig = ZKConfigBuilder.getZKConfig(config); client = new MetadataServiceClientImpl(config); } - public Coordinator(Config config, ConfigBusProducer producer, IMetadataServiceClient client) { + public Coordinator(Config config, ZKConfig zkConfig, IMetadataServiceClient client) { this.config = config; - this.producer = producer; + this.zkConfig = zkConfig; this.client = client; } - public synchronized ScheduleState schedule(ScheduleOption option) { - ScheduleZkState scheduleZkState = new ScheduleZkState(); - ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() { - @Override - public void run() throws Exception { - scheduleZkState.scheduleAcquired = true; - - while (!scheduleZkState.scheduleCompleted) { - Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS); - } - } - }; - ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, exclusiveRunnable); - int waitMaxTimes = 0; - while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 3 minutes waiting - if (!scheduleZkState.scheduleAcquired) { - waitMaxTimes++; - try { - Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS); - } catch (InterruptedException e) { - // ignored - } - continue; - } - - ScheduleState state = null; - try { + public synchronized ScheduleState schedule(ScheduleOption option) throws TimeoutException { + ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); + AtomicReference<ScheduleState> reference = new AtomicReference<>(); + try { + executor.execute(GREEDY_SCHEDULER_ZK_PATH, () -> { + ScheduleState state = null; Stopwatch watch = Stopwatch.createStarted(); IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext(); TopologyMgmtService mgmtService = new TopologyMgmtService(); @@ -146,28 +111,32 @@ public class Coordinator { watch.start(); // persist & notify - postSchedule(client, state, producer); + try (ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) { + postSchedule(client, state, producer); + } watch.stop(); long postTime = watch.elapsed(TimeUnit.MILLISECONDS); LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime); - + reference.set(state); currentState = state; - } finally { - //schedule completed - scheduleZkState.scheduleCompleted = true; + }); + } catch (TimeoutException e1) { + LOG.error("time out when schedule", e1); + throw e1; + } finally { + try { + executor.close(); + } catch (IOException e) { + LOG.error("Exception when close exclusive executor, log and ignore!", e); } - return state; } - throw new LockWebApplicationException("Acquire scheduler lock failed, please retry later"); + return reference.get(); } public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) { // persist state client.addScheduleState(state); - // TODO, see ScheduleState comments on how to better store these configs - // store policy assignment - // store monitored stream // notify ConfigValue value = new ConfigValue(); @@ -224,75 +193,26 @@ public class Coordinator { } } - private static class PolicyChangeHandler implements PolicyChangeListener { - private static final Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class); - private Config config; - private IMetadataServiceClient client; - - public PolicyChangeHandler(Config config, IMetadataServiceClient client) { - this.config = config; - this.client = client; - } - - @Override - public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, - Collection<String> removedPolicies, Collection<String> modifiedPolicies) { - LOG.info("policy changed ... "); - LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: " - + removedPolicies + ", modifiedPolicies: " + modifiedPolicies); - - CoordinatorTrigger trigger = new CoordinatorTrigger(config, client); - trigger.run(); - - } - } - - public static void main(String[] args) throws Exception { - startSchedule(); - - Thread.currentThread().join(); - } - public static void startSchedule() { - ExclusiveExecutor.execute(POLICY_SCHEDULER_ZK_PATH, new ExclusiveExecutor.Runnable() { - - @Override - public void run() throws Exception { - Config config = ConfigFactory.load().getConfig(COORDINATOR); - // build dynamic policy loader - String host = config.getString(METADATA_SERVICE_HOST); - int port = config.getInt(METADATA_SERVICE_PORT); - String context = config.getString(METADATA_SERVICE_CONTEXT); - IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context); - DynamicPolicyLoader loader = new DynamicPolicyLoader(client); - loader.addPolicyChangeListener(new PolicyChangeHandler(config, client)); - - // schedule dynamic policy loader - long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS); - long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS); - ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r); - t.setDaemon(true); - return t; - } - }); - scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS); - - // disable periodically schedule by default due for the sake of Metadata store performance - /*** - scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY, - CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, TimeUnit.MILLISECONDS); - ***/ - - Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv))); - LOG.info("Eagle Coordinator started ..."); - - Thread.currentThread().join(); - } - + Config config = ConfigFactory.load().getConfig(COORDINATOR); + String host = config.getString(METADATA_SERVICE_HOST); + int port = config.getInt(METADATA_SERVICE_PORT); + String context = config.getString(METADATA_SERVICE_CONTEXT); + IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context); + + // schedule dynamic policy loader + long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS); + long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS); + ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, r -> { + Thread t = new Thread(r); + t.setDaemon(true); + return t; }); + + scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, client), initDelayMillis, delayMillis, TimeUnit.MILLISECONDS); + + Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv))); + LOG.info("Eagle Coordinator started ..."); } public void enforcePeriodicallyBuild() { @@ -307,9 +227,4 @@ public class Coordinator { return forcePeriodicallyBuild.get(); } - public static class ScheduleZkState { - volatile boolean scheduleAcquired = false; - volatile boolean scheduleCompleted = false; - } - } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java index 74329e3..86a51d9 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java @@ -16,67 +16,56 @@ */ package org.apache.eagle.alert.coordinator; -import org.apache.eagle.alert.config.ZKConfig; -import org.apache.eagle.alert.config.ZKConfigBuilder; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import org.apache.curator.RetryPolicy; +import com.google.common.base.Stopwatch; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.CloseableUtils; +import org.apache.eagle.alert.config.ZKConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ExclusiveExecutor { +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ExclusiveExecutor implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(ExclusiveExecutor.class); - // private static final String PATH = "/alert/listener/leader"; - private static final String COORDINATOR = "coordinator"; private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000; private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3; - private static final CuratorFramework client; + public static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 3000; + public static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 100; //about 5 minutes + + private CuratorFramework client; + private LeaderSelector selector; - static { - Config config = ConfigFactory.load().getConfig(COORDINATOR); - RetryPolicy retryPolicy = new ExponentialBackoffRetry(ZK_RETRYPOLICY_SLEEP_TIME_MS, ZK_RETRYPOLICY_MAX_RETRIES); - ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - client = CuratorFrameworkFactory.newClient(zkConfig.zkQuorum, retryPolicy); + public ExclusiveExecutor(ZKConfig zkConfig ) { + client = CuratorFrameworkFactory.newClient( + zkConfig.zkQuorum, + zkConfig.zkSessionTimeoutMs, + zkConfig.connectionTimeoutMs, + new RetryNTimes(ZK_RETRYPOLICY_MAX_RETRIES, ZK_RETRYPOLICY_SLEEP_TIME_MS) + ); client.start(); } - public abstract static class Runnable { - boolean completed = false; - LeaderSelector selector; - - public abstract void run() throws Exception; - - public void registerResources(LeaderSelector selector) { - this.selector = selector; - } - - public void runElegantly() throws Exception { - this.run(); - - LOG.info("Close selector resources {}", this.selector); - CloseableUtils.closeQuietly(this.selector); - - completed = true; - } - - public boolean isCompleted() { - return completed; - } - + public void execute(String path, final Runnable r) throws TimeoutException { + execute(path, r, ACQUIRE_LOCK_MAX_RETRIES_TIMES * ACQUIRE_LOCK_WAIT_INTERVAL_MS); } - public static void execute(String path, final Runnable runnable) { + public void execute(String path, final Runnable r, int timeoutMillis) throws TimeoutException { + final AtomicBoolean executed = new AtomicBoolean(false); + Stopwatch watch = Stopwatch.createUnstarted(); + watch.start(); LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() { @Override @@ -85,7 +74,9 @@ public class ExclusiveExecutor { // do whatever leader work you need to and only exit // this method when you want to relinquish leadership LOG.info("this is leader node right now.."); - runnable.runElegantly(); + r.run(); + LOG.info("leader node executed done!.."); + executed.set(true); } @Override @@ -95,22 +86,41 @@ public class ExclusiveExecutor { }; - LeaderSelector selector = new LeaderSelector(client, path, listener); - selector.autoRequeue(); // not required, but this is behavior that you + selector = new LeaderSelector(client, path, listener); + // selector.autoRequeue(); // not required, but this is behavior that you // will probably expect selector.start(); - runnable.registerResources(selector); - - Runtime.getRuntime().addShutdownHook(new Thread(new java.lang.Runnable() { - - @Override - public void run() { - LOG.info("Close zk client resources {}", ExclusiveExecutor.client); - CloseableUtils.closeQuietly(ExclusiveExecutor.client); + // wait for given times + while (watch.elapsed(TimeUnit.MILLISECONDS) < timeoutMillis) { //about 3 minutes waiting + if (!executed.get()) { + try { + Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS); + } catch (InterruptedException e) { + // ignored + } + continue; + } else { + break; } + } + watch.stop(); - })); + if (!executed.get()) { + throw new TimeoutException(String.format("Get exclusive lock for operation on path %s failed due to wait too much time: %d ms", + path, watch.elapsed(TimeUnit.MILLISECONDS))); + } + LOG.info("Exclusive operation done with execution time (lock plus operation) {} ms !", watch.elapsed(TimeUnit.MILLISECONDS)); + } + + @Override + public void close() throws IOException { + if (selector != null) { + CloseableUtils.closeQuietly(this.selector); + } + if (client != null) { + CloseableUtils.closeQuietly(this.client); + } } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java index e0e0612..454f47c 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java @@ -16,14 +16,15 @@ */ package org.apache.eagle.alert.coordinator.trigger; +import com.google.common.base.Stopwatch; +import com.typesafe.config.Config; import org.apache.eagle.alert.config.ConfigBusProducer; +import org.apache.eagle.alert.config.ZKConfig; import org.apache.eagle.alert.config.ZKConfigBuilder; import org.apache.eagle.alert.coordination.model.ScheduleState; import org.apache.eagle.alert.coordinator.*; import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder; import org.apache.eagle.alert.service.IMetadataServiceClient; -import com.google.common.base.Stopwatch; -import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,10 +34,6 @@ import java.util.concurrent.TimeUnit; * @since Jun 27, 2016. */ public class CoordinatorTrigger implements Runnable { - // TODO : support configurable in coordiantor - public static final int INIT_PERIODICALLY_TRIGGER_DELAY = 6000; - // 30 minutes a trigger by default - public static final int INIT_PERIODICALLY_TRIGGER_INTERVAL = 1000 * 60 * 30; private static final Logger LOG = LoggerFactory.getLogger(CoordinatorTrigger.class); @@ -50,33 +47,35 @@ public class CoordinatorTrigger implements Runnable { @Override public void run() { - try { - if (Coordinator.isPeriodicallyForceBuildEnable()) { - LOG.info("CoordinatorTrigger started ... "); - - Stopwatch watch = Stopwatch.createStarted(); + if (Coordinator.isPeriodicallyForceBuildEnable()) { + LOG.info("CoordinatorTrigger started ... "); - // schedule - IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext(); - TopologyMgmtService mgmtService = new TopologyMgmtService(); - IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler(); + Stopwatch watch = Stopwatch.createStarted(); + ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); + try (ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig)) { + executor.execute(Coordinator.GREEDY_SCHEDULER_ZK_PATH, () -> { + // schedule + IScheduleContext context = new ScheduleContextBuilder(config, client).buildContext(); + TopologyMgmtService mgmtService = new TopologyMgmtService(); + IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler(); - scheduler.init(context, mgmtService); + scheduler.init(context, mgmtService); - ScheduleState state = scheduler.schedule(new ScheduleOption()); + ScheduleState state = scheduler.schedule(new ScheduleOption()); - // use try catch to use AutoCloseable interface to close producer automatically - try (ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) { - Coordinator.postSchedule(client, state, producer); - } + // use try catch to use AutoCloseable interface to close producer automatically + try (ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) { + Coordinator.postSchedule(client, state, producer); + } - watch.stop(); - LOG.info("CoordinatorTrigger ended, used time {} sm.", watch.elapsed(TimeUnit.MILLISECONDS)); - } else { - LOG.info("CoordinatorTrigger found isPeriodicallyForceBuildEnable = false, skipped build"); + watch.stop(); + LOG.info("CoordinatorTrigger ended, used time {} sm.", watch.elapsed(TimeUnit.MILLISECONDS)); + }); + } catch (Exception e) { + LOG.error("trigger schedule failed!", e); } - } catch (Exception e) { - LOG.error("trigger schedule failed!", e); + } else { + LOG.info("CoordinatorTrigger found isPeriodicallyForceBuildEnable = false, skipped build"); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java index f55a53e..7dadbf5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java @@ -68,10 +68,9 @@ public class CoordinatorTest { before(); Config config = ConfigFactory.load().getConfig("coordinator"); ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - ConfigBusProducer producer = new ConfigBusProducer(zkConfig); IMetadataServiceClient client = new MetadataServiceClientImpl(config); - Coordinator coordinator = new Coordinator(config, producer, client); + Coordinator coordinator = new Coordinator(config, zkConfig, client); ScheduleOption option = new ScheduleOption(); ScheduleState state = coordinator.schedule(option); String v = state.getVersion(); @@ -96,10 +95,9 @@ public class CoordinatorTest { before(); Config config = ConfigFactory.load().getConfig("coordinator"); ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); - ConfigBusProducer producer = new ConfigBusProducer(zkConfig); IMetadataServiceClient client = ScheduleContextBuilderTest.getSampleMetadataService(); - Coordinator coordinator = new Coordinator(config, producer, client); + Coordinator coordinator = new Coordinator(config, zkConfig, client); ScheduleOption option = new ScheduleOption(); ScheduleState state = coordinator.schedule(option); String v = state.getVersion(); @@ -122,13 +120,6 @@ public class CoordinatorTest { Assert.assertTrue(validated.get()); } - @Ignore - @Test - public void test_main() throws Exception { - before(); - Coordinator.main(null); - } - @Before public void before() { System.setProperty("config.resource", "/test-application.conf"); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java index 7e170a8..1f3baf5 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java @@ -16,22 +16,20 @@ */ package org.apache.alert.coordinator; +import com.google.common.base.Joiner; +import org.apache.eagle.alert.config.ZKConfig; +import org.apache.eagle.alert.coordinator.ExclusiveExecutor; +import org.apache.eagle.alert.utils.ZookeeperEmbedded; +import org.junit.*; + import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.io.StringReader; import java.util.ArrayList; import java.util.List; - -import org.apache.eagle.alert.coordinator.ExclusiveExecutor; -import org.apache.eagle.alert.utils.ZookeeperEmbedded; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; - -import com.google.common.base.Joiner; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; @Ignore public class TestExclusiveExecutor { @@ -59,39 +57,34 @@ public class TestExclusiveExecutor { System.setOut(newStream); - ExclusiveExecutor.Runnable runnableOne = new ExclusiveExecutor.Runnable() { - - @Override - public void run() throws Exception { - System.out.println("this is thread one"); + ZKConfig zkConfig = new ZKConfig(); + zkConfig.zkQuorum = "127.0.0.1:2181"; + zkConfig.zkRetryTimes = 3; + zkConfig.zkRoot = "/"; + zkConfig.connectionTimeoutMs = 3000; + zkConfig.zkRetryInterval = 1000; + zkConfig.zkSessionTimeoutMs = 5000; + + String path = "/concurrenty"; + AtomicBoolean lock1 = new AtomicBoolean(false); + Runnable runnableOne = () -> { System.out.println("this is thread one"); lock1.set(true);}; + new Thread(() -> { + ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); + try { + executor.execute(path, runnableOne); + } catch (TimeoutException e) { } - - }; - - new Thread(new Runnable() { - - @Override - public void run() { - ExclusiveExecutor.execute("/alert/test/leader", runnableOne); - } - }).start(); - ExclusiveExecutor.Runnable runnableTwo = new ExclusiveExecutor.Runnable() { - @Override - public void run() throws Exception { - System.out.println("this is thread two"); + AtomicBoolean lock2 = new AtomicBoolean(); + Runnable runnableTwo = () -> { System.out.println("this is thread two"); lock2.set(true);}; + new Thread(() -> { + ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); + try { + executor.execute(path, runnableTwo); + } catch (TimeoutException e) { } - - }; - new Thread(new Runnable() { - - @Override - public void run() { - ExclusiveExecutor.execute("/alert/test/leader", runnableTwo); - } - }).start(); Thread.sleep(2000); @@ -110,8 +103,8 @@ public class TestExclusiveExecutor { Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread one"))); Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this is thread two"))); - Assert.assertTrue(runnableOne.isCompleted()); - Assert.assertTrue(runnableTwo.isCompleted()); + Assert.assertTrue(lock1.get()); + Assert.assertTrue(lock2.get()); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java index 91df334..875bb81 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java @@ -16,54 +16,44 @@ */ package org.apache.alert.coordinator; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.config.ZKConfig; +import org.apache.eagle.alert.config.ZKConfigBuilder; import org.apache.eagle.alert.coordinator.ExclusiveExecutor; import org.apache.eagle.alert.utils.ZookeeperEmbedded; import org.junit.After; import org.junit.Before; import org.junit.Test; -public class TestGreedyScheduleCoordinator { +import java.io.IOException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; - public static class ScheduleZkState { - volatile boolean scheduleAcquired = false; - volatile boolean scheduleCompleted = false; - } +public class TestGreedyScheduleCoordinator { public static class GreedyScheduleCoordinator { - public int schedule(int input) { - ScheduleZkState scheduleZkState = new ScheduleZkState(); - ExclusiveExecutor.Runnable exclusiveRunnable = new ExclusiveExecutor.Runnable() { - @Override - public void run() throws Exception { - scheduleZkState.scheduleAcquired = true; - - while (!scheduleZkState.scheduleCompleted) { - Thread.sleep(2000); - } - } - }; - ExclusiveExecutor.execute("/alert/test", exclusiveRunnable); - int waitMaxTimes = 0; - while (waitMaxTimes < 90) { //about 3 minutes waiting - if (!scheduleZkState.scheduleAcquired) { - waitMaxTimes++; - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - } - continue; - } + public int schedule(int input) throws TimeoutException { + Config config = ConfigFactory.load().getConfig("coordinator"); + ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config); + ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig); + final AtomicInteger r = new AtomicInteger(); + executor.execute("/alert/test", () -> { try { - return input; - } finally { - //schedule completed - scheduleZkState.scheduleCompleted = true; + Thread.sleep(input); + } catch (Exception e){ } + + r.set(input); + }); + try { + executor.close(); + } catch (IOException e) { + e.printStackTrace(); } throw new RuntimeException("Acquire greedy scheduler lock failed, please retry later"); } - } ZookeeperEmbedded zkEmbed; @@ -90,7 +80,10 @@ public class TestGreedyScheduleCoordinator { @Override public void run() { - System.out.println("output: " + coordinator.schedule(1)); + try { + System.out.println("output: " + coordinator.schedule(1)); + } catch (TimeoutException e) { + } try { Thread.sleep(1000); @@ -104,7 +97,10 @@ public class TestGreedyScheduleCoordinator { @Override public void run() { - System.out.println("output: " + coordinator.schedule(2)); + try { + System.out.println("output: " + coordinator.schedule(2)); + } catch (TimeoutException e) { + } try { Thread.sleep(1000); @@ -118,7 +114,10 @@ public class TestGreedyScheduleCoordinator { @Override public void run() { - System.out.println("output: " + coordinator.schedule(3)); + try { + System.out.println("output: " + coordinator.schedule(3)); + } catch (TimeoutException e) { + } try { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf index 963d2ad..566dc83 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf @@ -36,7 +36,7 @@ }, "metadataDynamicCheck": { "initDelayMillis": 1000, - "delayMillis": 30000 + "delayMillis": 600000 }, "kafkaProducer": { "bootstrapServers": "localhost:9092"