Repository: incubator-eagle
Updated Branches:
  refs/heads/master cb3bf7b60 -> f21145635


EAGLE-675 : AlertEngine: don't host long-live curator framework for schedule

Author: ralphsu

This closes #626


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f2114563
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f2114563
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f2114563

Branch: refs/heads/master
Commit: f21145635a609dc25477f0384cbb14fb17fbd0be
Parents: cb3bf7b
Author: Ralph, Su <suliang...@gmail.com>
Authored: Tue Nov 8 20:31:57 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Tue Nov 8 21:12:34 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/coordinator/Coordinator.java    | 179 +++++--------------
 .../alert/coordinator/ExclusiveExecutor.java    | 112 ++++++------
 .../coordinator/trigger/CoordinatorTrigger.java |  53 +++---
 .../alert/coordinator/CoordinatorTest.java      |  13 +-
 .../coordinator/TestExclusiveExecutor.java      |  73 ++++----
 .../TestGreedyScheduleCoordinator.java          |  71 ++++----
 .../src/test/resources/test-application.conf    |   2 +-
 7 files changed, 205 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index deeeec9..5ed6700 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -27,32 +27,22 @@ import 
org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordinator.impl.MetadataValdiator;
 import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
 import org.apache.eagle.alert.coordinator.trigger.CoordinatorTrigger;
-import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
-import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * TODO: To simply avoid concurrent call of schdule, make the schedule as 
synchronized. This is not safe when multiple
- * instance, consider a distributed lock for prevent multiple schedule happen 
concurrently.
- *
- * <p>Coordinator is a standalone java application, which listens to policy 
changes and use schedule
- * algorithm to distribute policies 1) reacting to shutdown events 2) start 
non-daemon thread to pull policies
- * and figure out if polices are changed</p>
- * 
- * @since Mar 24, 2016 
+ * @since Mar 24, 2016.
  */
 public class Coordinator {
 
@@ -79,13 +69,10 @@ public class Coordinator {
     private static final String DYNAMIC_POLICY_LOADER_INIT_MILLS = 
"metadataDynamicCheck.initDelayMillis";
     private static final String DYNAMIC_POLICY_LOADER_DELAY_MILLS = 
"metadataDynamicCheck.delayMillis";
 
-    private static final String GREEDY_SCHEDULER_ZK_PATH = 
"/alert/greedy/leader";
-    private static final String POLICY_SCHEDULER_ZK_PATH = 
"/alert/policy/leader";
-    private static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 2000;
-    private static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 90; //about 9 
minutes
+    public static final String GREEDY_SCHEDULER_ZK_PATH = 
"/alert/greedy/leader";
 
     private volatile ScheduleState currentState = null;
-    private final ConfigBusProducer producer;
+    private ZKConfig zkConfig = null;
     private final IMetadataServiceClient client;
     private Config config;
 
@@ -94,44 +81,22 @@ public class Coordinator {
 
     public Coordinator() {
         config = ConfigFactory.load().getConfig(COORDINATOR);
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        producer = new ConfigBusProducer(zkConfig);
+        zkConfig = ZKConfigBuilder.getZKConfig(config);
         client = new MetadataServiceClientImpl(config);
     }
 
-    public Coordinator(Config config, ConfigBusProducer producer, 
IMetadataServiceClient client) {
+    public Coordinator(Config config, ZKConfig zkConfig, 
IMetadataServiceClient client) {
         this.config = config;
-        this.producer = producer;
+        this.zkConfig = zkConfig;
         this.client = client;
     }
 
-    public synchronized ScheduleState schedule(ScheduleOption option) {
-        ScheduleZkState scheduleZkState = new ScheduleZkState();
-        ExclusiveExecutor.Runnable exclusiveRunnable = new 
ExclusiveExecutor.Runnable() {
-            @Override
-            public void run() throws Exception {
-                scheduleZkState.scheduleAcquired = true;
-
-                while (!scheduleZkState.scheduleCompleted) {
-                    Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
-                }
-            }
-        };
-        ExclusiveExecutor.execute(GREEDY_SCHEDULER_ZK_PATH, exclusiveRunnable);
-        int waitMaxTimes = 0;
-        while (waitMaxTimes < ACQUIRE_LOCK_MAX_RETRIES_TIMES) { //about 3 
minutes waiting
-            if (!scheduleZkState.scheduleAcquired) {
-                waitMaxTimes++;
-                try {
-                    Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
-                } catch (InterruptedException e) {
-                    // ignored
-                }
-                continue;
-            }
-
-            ScheduleState state = null;
-            try {
+    public synchronized ScheduleState schedule(ScheduleOption option) throws 
TimeoutException {
+        ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
+        AtomicReference<ScheduleState> reference = new AtomicReference<>();
+        try {
+            executor.execute(GREEDY_SCHEDULER_ZK_PATH, () -> {
+                ScheduleState state = null;
                 Stopwatch watch = Stopwatch.createStarted();
                 IScheduleContext context = new ScheduleContextBuilder(config, 
client).buildContext();
                 TopologyMgmtService mgmtService = new TopologyMgmtService();
@@ -146,28 +111,32 @@ public class Coordinator {
                 watch.start();
 
                 // persist & notify
-                postSchedule(client, state, producer);
+                try (ConfigBusProducer producer = new 
ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) {
+                    postSchedule(client, state, producer);
+                }
 
                 watch.stop();
                 long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
                 LOG.info("Schedule result, schedule time {} ms, post schedule 
time {} ms !", scheduleTime, postTime);
-
+                reference.set(state);
                 currentState = state;
-            } finally {
-                //schedule completed
-                scheduleZkState.scheduleCompleted = true;
+            });
+        } catch (TimeoutException e1) {
+            LOG.error("time out when schedule", e1);
+            throw e1;
+        } finally {
+            try {
+                executor.close();
+            } catch (IOException e) {
+                LOG.error("Exception when close exclusive executor, log and 
ignore!", e);
             }
-            return state;
         }
-        throw new LockWebApplicationException("Acquire scheduler lock failed, 
please retry later");
+        return reference.get();
     }
 
     public static void postSchedule(IMetadataServiceClient client, 
ScheduleState state, ConfigBusProducer producer) {
         // persist state
         client.addScheduleState(state);
-        // TODO, see ScheduleState comments on how to better store these 
configs
-        // store policy assignment
-        // store monitored stream
 
         // notify
         ConfigValue value = new ConfigValue();
@@ -224,75 +193,26 @@ public class Coordinator {
         }
     }
 
-    private static class PolicyChangeHandler implements PolicyChangeListener {
-        private static final Logger LOG = 
LoggerFactory.getLogger(PolicyChangeHandler.class);
-        private Config config;
-        private IMetadataServiceClient client;
-
-        public PolicyChangeHandler(Config config, IMetadataServiceClient 
client) {
-            this.config = config;
-            this.client = client;
-        }
-
-        @Override
-        public void onPolicyChange(List<PolicyDefinition> allPolicies, 
Collection<String> addedPolicies,
-                                   Collection<String> removedPolicies, 
Collection<String> modifiedPolicies) {
-            LOG.info("policy changed ... ");
-            LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + 
addedPolicies + ", removedPolicies: "
-                + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
-
-            CoordinatorTrigger trigger = new CoordinatorTrigger(config, 
client);
-            trigger.run();
-
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        startSchedule();
-
-        Thread.currentThread().join();
-    }
-
     public static void startSchedule() {
-        ExclusiveExecutor.execute(POLICY_SCHEDULER_ZK_PATH, new 
ExclusiveExecutor.Runnable() {
-
-            @Override
-            public void run() throws Exception {
-                Config config = ConfigFactory.load().getConfig(COORDINATOR);
-                // build dynamic policy loader
-                String host = config.getString(METADATA_SERVICE_HOST);
-                int port = config.getInt(METADATA_SERVICE_PORT);
-                String context = config.getString(METADATA_SERVICE_CONTEXT);
-                IMetadataServiceClient client = new 
MetadataServiceClientImpl(host, port, context);
-                DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
-                loader.addPolicyChangeListener(new PolicyChangeHandler(config, 
client));
-
-                // schedule dynamic policy loader
-                long initDelayMillis = 
config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
-                long delayMillis = 
config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
-                ScheduledExecutorService scheduleSrv = 
Executors.newScheduledThreadPool(2, new ThreadFactory() {
-                    @Override
-                    public Thread newThread(Runnable r) {
-                        Thread t = new Thread(r);
-                        t.setDaemon(true);
-                        return t;
-                    }
-                });
-                scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, 
delayMillis, TimeUnit.MILLISECONDS);
-
-                // disable periodically schedule by default due for the sake 
of Metadata store performance
-                /***
-                scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, 
client), CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_DELAY,
-                    CoordinatorTrigger.INIT_PERIODICALLY_TRIGGER_INTERVAL, 
TimeUnit.MILLISECONDS);
-                ***/
-                
-                Runtime.getRuntime().addShutdownHook(new Thread(new 
CoordinatorShutdownHook(scheduleSrv)));
-                LOG.info("Eagle Coordinator started ...");
-
-                Thread.currentThread().join();
-            }
-
+        Config config = ConfigFactory.load().getConfig(COORDINATOR);
+        String host = config.getString(METADATA_SERVICE_HOST);
+        int port = config.getInt(METADATA_SERVICE_PORT);
+        String context = config.getString(METADATA_SERVICE_CONTEXT);
+        IMetadataServiceClient client = new MetadataServiceClientImpl(host, 
port, context);
+
+        // schedule dynamic policy loader
+        long initDelayMillis = 
config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
+        long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
+        ScheduledExecutorService scheduleSrv = 
Executors.newScheduledThreadPool(2, r -> {
+            Thread t = new Thread(r);
+            t.setDaemon(true);
+            return t;
         });
+
+        scheduleSrv.scheduleAtFixedRate(new CoordinatorTrigger(config, 
client), initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new 
CoordinatorShutdownHook(scheduleSrv)));
+        LOG.info("Eagle Coordinator started ...");
     }
 
     public void enforcePeriodicallyBuild() {
@@ -307,9 +227,4 @@ public class Coordinator {
         return forcePeriodicallyBuild.get();
     }
 
-    public static class ScheduleZkState {
-        volatile boolean scheduleAcquired = false;
-        volatile boolean scheduleCompleted = false;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
index 74329e3..86a51d9 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ExclusiveExecutor.java
@@ -16,67 +16,56 @@
  */
 package org.apache.eagle.alert.coordinator;
 
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.curator.RetryPolicy;
+import com.google.common.base.Stopwatch;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import 
org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
 import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.utils.CloseableUtils;
+import org.apache.eagle.alert.config.ZKConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ExclusiveExecutor {
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ExclusiveExecutor implements Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ExclusiveExecutor.class);
 
-    // private static final String PATH = "/alert/listener/leader";
-    private static final String COORDINATOR = "coordinator";
     private static final int ZK_RETRYPOLICY_SLEEP_TIME_MS = 1000;
     private static final int ZK_RETRYPOLICY_MAX_RETRIES = 3;
 
-    private static final CuratorFramework client;
+    public static final int ACQUIRE_LOCK_WAIT_INTERVAL_MS = 3000;
+    public static final int ACQUIRE_LOCK_MAX_RETRIES_TIMES = 100; //about 5 
minutes
+
+    private CuratorFramework client;
+    private LeaderSelector selector;
 
-    static {
-        Config config = ConfigFactory.load().getConfig(COORDINATOR);
-        RetryPolicy retryPolicy = new 
ExponentialBackoffRetry(ZK_RETRYPOLICY_SLEEP_TIME_MS, 
ZK_RETRYPOLICY_MAX_RETRIES);
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        client = CuratorFrameworkFactory.newClient(zkConfig.zkQuorum, 
retryPolicy);
+    public ExclusiveExecutor(ZKConfig zkConfig ) {
+        client = CuratorFrameworkFactory.newClient(
+            zkConfig.zkQuorum,
+            zkConfig.zkSessionTimeoutMs,
+            zkConfig.connectionTimeoutMs,
+            new RetryNTimes(ZK_RETRYPOLICY_MAX_RETRIES, 
ZK_RETRYPOLICY_SLEEP_TIME_MS)
+        );
         client.start();
     }
 
-    public abstract static class Runnable {
-        boolean completed = false;
-        LeaderSelector selector;
-
-        public abstract void run() throws Exception;
-
-        public void registerResources(LeaderSelector selector) {
-            this.selector = selector;
-        }
-
-        public void runElegantly() throws Exception {
-            this.run();
-
-            LOG.info("Close selector resources {}", this.selector);
-            CloseableUtils.closeQuietly(this.selector);
-
-            completed = true;
-        }
-
-        public boolean isCompleted() {
-            return completed;
-        }
-
+    public void execute(String path, final Runnable r) throws TimeoutException 
{
+        execute(path, r, ACQUIRE_LOCK_MAX_RETRIES_TIMES * 
ACQUIRE_LOCK_WAIT_INTERVAL_MS);
     }
 
-    public static void execute(String path, final Runnable runnable) {
+    public void execute(String path, final Runnable r, int timeoutMillis) 
throws TimeoutException {
+        final AtomicBoolean executed = new AtomicBoolean(false);
+        Stopwatch watch = Stopwatch.createUnstarted();
+        watch.start();
         LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() {
 
             @Override
@@ -85,7 +74,9 @@ public class ExclusiveExecutor {
                 // do whatever leader work you need to and only exit
                 // this method when you want to relinquish leadership
                 LOG.info("this is leader node right now..");
-                runnable.runElegantly();
+                r.run();
+                LOG.info("leader node executed done!..");
+                executed.set(true);
             }
 
             @Override
@@ -95,22 +86,41 @@ public class ExclusiveExecutor {
 
         };
 
-        LeaderSelector selector = new LeaderSelector(client, path, listener);
-        selector.autoRequeue(); // not required, but this is behavior that you
+        selector = new LeaderSelector(client, path, listener);
+        // selector.autoRequeue(); // not required, but this is behavior that 
you
         // will probably expect
         selector.start();
 
-        runnable.registerResources(selector);
-
-        Runtime.getRuntime().addShutdownHook(new Thread(new 
java.lang.Runnable() {
-
-            @Override
-            public void run() {
-                LOG.info("Close zk client resources {}", 
ExclusiveExecutor.client);
-                CloseableUtils.closeQuietly(ExclusiveExecutor.client);
+        // wait for given times
+        while (watch.elapsed(TimeUnit.MILLISECONDS) < timeoutMillis) { //about 
3 minutes waiting
+            if (!executed.get()) {
+                try {
+                    Thread.sleep(ACQUIRE_LOCK_WAIT_INTERVAL_MS);
+                } catch (InterruptedException e) {
+                    // ignored
+                }
+                continue;
+            } else {
+                break;
             }
+        }
+        watch.stop();
 
-        }));
+        if (!executed.get()) {
+            throw new TimeoutException(String.format("Get exclusive lock for 
operation on path %s failed due to wait too much time: %d ms",
+                path, watch.elapsed(TimeUnit.MILLISECONDS)));
+        }
+        LOG.info("Exclusive operation done with execution time (lock plus 
operation) {} ms !", watch.elapsed(TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (selector != null) {
+            CloseableUtils.closeQuietly(this.selector);
+        }
+        if (client != null) {
+            CloseableUtils.closeQuietly(this.client);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
index e0e0612..454f47c 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/CoordinatorTrigger.java
@@ -16,14 +16,15 @@
  */
 package org.apache.eagle.alert.coordinator.trigger;
 
+import com.google.common.base.Stopwatch;
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.config.ConfigBusProducer;
+import org.apache.eagle.alert.config.ZKConfig;
 import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.coordination.model.ScheduleState;
 import org.apache.eagle.alert.coordinator.*;
 import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
-import com.google.common.base.Stopwatch;
-import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,10 +34,6 @@ import java.util.concurrent.TimeUnit;
  * @since Jun 27, 2016.
  */
 public class CoordinatorTrigger implements Runnable {
-    // TODO : support configurable in coordiantor
-    public static final int INIT_PERIODICALLY_TRIGGER_DELAY = 6000;
-    // 30 minutes a trigger by default
-    public static final int INIT_PERIODICALLY_TRIGGER_INTERVAL = 1000 * 60 * 
30;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CoordinatorTrigger.class);
 
@@ -50,33 +47,35 @@ public class CoordinatorTrigger implements Runnable {
 
     @Override
     public void run() {
-        try {
-            if (Coordinator.isPeriodicallyForceBuildEnable()) {
-                LOG.info("CoordinatorTrigger started ... ");
-
-                Stopwatch watch = Stopwatch.createStarted();
+        if (Coordinator.isPeriodicallyForceBuildEnable()) {
+            LOG.info("CoordinatorTrigger started ... ");
 
-                // schedule
-                IScheduleContext context = new ScheduleContextBuilder(config, 
client).buildContext();
-                TopologyMgmtService mgmtService = new TopologyMgmtService();
-                IPolicyScheduler scheduler = 
PolicySchedulerFactory.createScheduler();
+            Stopwatch watch = Stopwatch.createStarted();
+            ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+            try (ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig)) 
{
+                executor.execute(Coordinator.GREEDY_SCHEDULER_ZK_PATH, () -> {
+                    // schedule
+                    IScheduleContext context = new 
ScheduleContextBuilder(config, client).buildContext();
+                    TopologyMgmtService mgmtService = new 
TopologyMgmtService();
+                    IPolicyScheduler scheduler = 
PolicySchedulerFactory.createScheduler();
 
-                scheduler.init(context, mgmtService);
+                    scheduler.init(context, mgmtService);
 
-                ScheduleState state = scheduler.schedule(new ScheduleOption());
+                    ScheduleState state = scheduler.schedule(new 
ScheduleOption());
 
-                // use try catch to use AutoCloseable interface to close 
producer automatically
-                try (ConfigBusProducer producer = new 
ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) {
-                    Coordinator.postSchedule(client, state, producer);
-                }
+                    // use try catch to use AutoCloseable interface to close 
producer automatically
+                    try (ConfigBusProducer producer = new 
ConfigBusProducer(ZKConfigBuilder.getZKConfig(config))) {
+                        Coordinator.postSchedule(client, state, producer);
+                    }
 
-                watch.stop();
-                LOG.info("CoordinatorTrigger ended, used time {} sm.", 
watch.elapsed(TimeUnit.MILLISECONDS));
-            } else {
-                LOG.info("CoordinatorTrigger found 
isPeriodicallyForceBuildEnable = false, skipped build");
+                    watch.stop();
+                    LOG.info("CoordinatorTrigger ended, used time {} sm.", 
watch.elapsed(TimeUnit.MILLISECONDS));
+                });
+            } catch (Exception e) {
+                LOG.error("trigger schedule failed!", e);
             }
-        } catch (Exception e) {
-            LOG.error("trigger schedule failed!", e);
+        } else {
+            LOG.info("CoordinatorTrigger found isPeriodicallyForceBuildEnable 
= false, skipped build");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
index f55a53e..7dadbf5 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
@@ -68,10 +68,9 @@ public class CoordinatorTest {
         before();
         Config config = ConfigFactory.load().getConfig("coordinator");
         ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
         IMetadataServiceClient client = new MetadataServiceClientImpl(config);
 
-        Coordinator coordinator = new Coordinator(config, producer, client);
+        Coordinator coordinator = new Coordinator(config, zkConfig, client);
         ScheduleOption option = new ScheduleOption();
         ScheduleState state = coordinator.schedule(option);
         String v = state.getVersion();
@@ -96,10 +95,9 @@ public class CoordinatorTest {
         before();
         Config config = ConfigFactory.load().getConfig("coordinator");
         ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
         IMetadataServiceClient client = 
ScheduleContextBuilderTest.getSampleMetadataService();
 
-        Coordinator coordinator = new Coordinator(config, producer, client);
+        Coordinator coordinator = new Coordinator(config, zkConfig, client);
         ScheduleOption option = new ScheduleOption();
         ScheduleState state = coordinator.schedule(option);
         String v = state.getVersion();
@@ -122,13 +120,6 @@ public class CoordinatorTest {
         Assert.assertTrue(validated.get());
     }
 
-    @Ignore
-    @Test
-    public void test_main() throws Exception {
-        before();
-        Coordinator.main(null);
-    }
-
     @Before
     public void before() {
         System.setProperty("config.resource", "/test-application.conf");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
index 7e170a8..1f3baf5 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestExclusiveExecutor.java
@@ -16,22 +16,20 @@
  */
 package org.apache.alert.coordinator;
 
+import com.google.common.base.Joiner;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
+import org.apache.eagle.alert.utils.ZookeeperEmbedded;
+import org.junit.*;
+
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
-
-import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
-import org.apache.eagle.alert.utils.ZookeeperEmbedded;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import com.google.common.base.Joiner;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 @Ignore
 public class TestExclusiveExecutor {
@@ -59,39 +57,34 @@ public class TestExclusiveExecutor {
 
         System.setOut(newStream);
 
-        ExclusiveExecutor.Runnable runnableOne = new 
ExclusiveExecutor.Runnable() {
-
-            @Override
-            public void run() throws Exception {
-                System.out.println("this is thread one");
+        ZKConfig zkConfig = new ZKConfig();
+        zkConfig.zkQuorum = "127.0.0.1:2181";
+        zkConfig.zkRetryTimes = 3;
+        zkConfig.zkRoot = "/";
+        zkConfig.connectionTimeoutMs  = 3000;
+        zkConfig.zkRetryInterval  = 1000;
+        zkConfig.zkSessionTimeoutMs = 5000;
+
+        String path = "/concurrenty";
+        AtomicBoolean lock1 = new AtomicBoolean(false);
+        Runnable runnableOne = () -> { System.out.println("this is thread 
one"); lock1.set(true);};
+        new Thread(() -> {
+            ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
+            try {
+                executor.execute(path, runnableOne);
+            } catch (TimeoutException e) {
             }
-
-        };
-
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                ExclusiveExecutor.execute("/alert/test/leader", runnableOne);
-            }
-
         }).start();
 
-        ExclusiveExecutor.Runnable runnableTwo = new 
ExclusiveExecutor.Runnable() {
 
-            @Override
-            public void run() throws Exception {
-                System.out.println("this is thread two");
+        AtomicBoolean lock2 = new AtomicBoolean();
+        Runnable runnableTwo = () ->  { System.out.println("this is thread 
two"); lock2.set(true);};
+        new Thread(() -> {
+            ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
+            try {
+                executor.execute(path, runnableTwo);
+            } catch (TimeoutException e) {
             }
-
-        };
-        new Thread(new Runnable() {
-
-            @Override
-            public void run() {
-                ExclusiveExecutor.execute("/alert/test/leader", runnableTwo);
-            }
-
         }).start();
 
         Thread.sleep(2000);
@@ -110,8 +103,8 @@ public class TestExclusiveExecutor {
         Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this 
is thread one")));
         Assert.assertTrue(logs.stream().anyMatch((log) -> log.contains("this 
is thread two")));
 
-        Assert.assertTrue(runnableOne.isCompleted());
-        Assert.assertTrue(runnableTwo.isCompleted());
+        Assert.assertTrue(lock1.get());
+        Assert.assertTrue(lock2.get());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
index 91df334..875bb81 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/TestGreedyScheduleCoordinator.java
@@ -16,54 +16,44 @@
  */
 package org.apache.alert.coordinator;
 
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.coordinator.ExclusiveExecutor;
 import org.apache.eagle.alert.utils.ZookeeperEmbedded;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-public class TestGreedyScheduleCoordinator {
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
-    public static class ScheduleZkState {
-        volatile boolean scheduleAcquired = false;
-        volatile boolean scheduleCompleted = false;
-    }
+public class TestGreedyScheduleCoordinator {
 
     public static class GreedyScheduleCoordinator {
 
-        public int schedule(int input) {
-            ScheduleZkState scheduleZkState = new ScheduleZkState();
-            ExclusiveExecutor.Runnable exclusiveRunnable = new 
ExclusiveExecutor.Runnable() {
-                @Override
-                public void run() throws Exception {
-                    scheduleZkState.scheduleAcquired = true;
-
-                    while (!scheduleZkState.scheduleCompleted) {
-                        Thread.sleep(2000);
-                    }
-                }
-            };
-            ExclusiveExecutor.execute("/alert/test", exclusiveRunnable);
-            int waitMaxTimes = 0;
-            while (waitMaxTimes < 90) { //about 3 minutes waiting
-                if (!scheduleZkState.scheduleAcquired) {
-                    waitMaxTimes++;
-                    try {
-                        Thread.sleep(2000);
-                    } catch (InterruptedException e) {
-                    }
-                    continue;
-                }
+        public int schedule(int input) throws TimeoutException {
+            Config config = ConfigFactory.load().getConfig("coordinator");
+            ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+            ExclusiveExecutor executor = new ExclusiveExecutor(zkConfig);
+            final AtomicInteger r = new AtomicInteger();
+            executor.execute("/alert/test", () -> {
                 try {
-                    return input;
-                } finally {
-                    //schedule completed
-                    scheduleZkState.scheduleCompleted = true;
+                    Thread.sleep(input);
+                } catch (Exception e){
                 }
+
+                r.set(input);
+            });
+            try {
+                executor.close();
+            } catch (IOException e) {
+                e.printStackTrace();
             }
             throw new RuntimeException("Acquire greedy scheduler lock failed, 
please retry later");
         }
-
     }
 
     ZookeeperEmbedded zkEmbed;
@@ -90,7 +80,10 @@ public class TestGreedyScheduleCoordinator {
 
             @Override
             public void run() {
-                System.out.println("output: " + coordinator.schedule(1));
+                try {
+                    System.out.println("output: " + coordinator.schedule(1));
+                } catch (TimeoutException e) {
+                }
 
                 try {
                     Thread.sleep(1000);
@@ -104,7 +97,10 @@ public class TestGreedyScheduleCoordinator {
 
             @Override
             public void run() {
-                System.out.println("output: " + coordinator.schedule(2));
+                try {
+                    System.out.println("output: " + coordinator.schedule(2));
+                } catch (TimeoutException e) {
+                }
 
                 try {
                     Thread.sleep(1000);
@@ -118,7 +114,10 @@ public class TestGreedyScheduleCoordinator {
 
             @Override
             public void run() {
-                System.out.println("output: " + coordinator.schedule(3));
+                try {
+                    System.out.println("output: " + coordinator.schedule(3));
+                } catch (TimeoutException e) {
+                }
 
                 try {
                     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f2114563/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
index 963d2ad..566dc83 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/test-application.conf
@@ -36,7 +36,7 @@
     },
     "metadataDynamicCheck": {
       "initDelayMillis": 1000,
-      "delayMillis": 30000
+      "delayMillis": 600000
     },
     "kafkaProducer": {
       "bootstrapServers": "localhost:9092"


Reply via email to