This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git


The following commit(s) were added to refs/heads/master by this push:
     new 6882df0  Add cloud app configuration & dissable listener (#1321)
6882df0 is described below

commit 6882df0662fd5ea961be50610242d91862158d75
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Aug 3 13:27:05 2020 +0800

    Add cloud app configuration & dissable listener (#1321)
    
    * Add cloud app configuration listener
    
    * Rename test method name
    
    * Rename test method name
---
 .../config/app/CloudAppConfigurationListener.java  | 100 +++++++++++++++++
 .../cloud/scheduler/mesos/SchedulerService.java    |  12 ++
 .../state/disable/app/CloudAppDisableListener.java | 106 ++++++++++++++++++
 .../app/CloudAppConfigurationListenerTest.java     | 105 ++++++++++++++++++
 .../scheduler/mesos/SchedulerServiceTest.java      |  11 +-
 .../disable/app/CloudAppDisableListenerTest.java   | 123 +++++++++++++++++++++
 6 files changed, 456 insertions(+), 1 deletion(-)

diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListener.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListener.java
new file mode 100644
index 0000000..a501747
--- /dev/null
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListener.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app;
+
+import com.google.gson.JsonParseException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.mesos.Protos;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos.MesosStateService;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.Collection;
+import java.util.concurrent.Executors;
+
+/**
+ * Cloud app configuration change listener.
+ */
+@Slf4j
+public final class CloudAppConfigurationListener implements 
CuratorCacheListener {
+    
+    private final CoordinatorRegistryCenter regCenter;
+    
+    private final ProducerManager producerManager;
+    
+    private MesosStateService mesosStateService;
+    
+    public CloudAppConfigurationListener(final CoordinatorRegistryCenter 
regCenter, final ProducerManager producerManager) {
+        this.regCenter = regCenter;
+        this.producerManager = producerManager;
+        mesosStateService = new MesosStateService(regCenter);
+    }
+    
+    @Override
+
+    public void event(final Type type, final ChildData oldData, final 
ChildData data) {
+        String path = data.getPath();
+        if (Type.NODE_DELETED == type && isJobAppConfigNode(path)) {
+            String appName = 
path.substring(CloudAppConfigurationNode.ROOT.length() + 1);
+            stopExecutors(appName);
+        }
+    }
+    
+    private boolean isJobAppConfigNode(final String path) {
+        return path.startsWith(CloudAppConfigurationNode.ROOT) && 
path.length() > CloudAppConfigurationNode.ROOT.length();
+    }
+    
+    /**
+     * Start the listener service of the cloud job service.
+     */
+    public void start() {
+        getCache().listenable().addListener(this, 
Executors.newSingleThreadExecutor());
+    }
+    
+    /**
+     * Stop the listener service of the cloud job service.
+     */
+    public void stop() {
+        getCache().listenable().removeListener(this);
+    }
+    
+    private CuratorCache getCache() {
+        CuratorCache result = (CuratorCache) 
regCenter.getRawCache(CloudAppConfigurationNode.ROOT);
+        if (null != result) {
+            return result;
+        }
+        regCenter.addCacheData(CloudAppConfigurationNode.ROOT);
+        return (CuratorCache) 
regCenter.getRawCache(CloudAppConfigurationNode.ROOT);
+    }
+    
+    private void stopExecutors(final String appName) {
+        try {
+            Collection<MesosStateService.ExecutorStateInfo> executorBriefInfo 
= mesosStateService.executors(appName);
+            for (MesosStateService.ExecutorStateInfo each : executorBriefInfo) 
{
+                
producerManager.sendFrameworkMessage(Protos.ExecutorID.newBuilder().setValue(each.getId()).build(),
+                        
Protos.SlaveID.newBuilder().setValue(each.getSlaveId()).build(), 
"STOP".getBytes());
+            }
+        } catch (final JsonParseException ex) {
+            throw new JobSystemException(ex);
+        }
+    }
+}
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
index 2cf1092..aed52ad 100755
--- 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
@@ -26,11 +26,13 @@ import org.apache.mesos.MesosSchedulerDriver;
 import org.apache.mesos.Protos;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.shardingsphere.elasticjob.cloud.console.ConsoleBootstrap;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfigurationListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.MesosConfiguration;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.ha.FrameworkIDService;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app.CloudAppDisableListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
@@ -66,6 +68,10 @@ public final class SchedulerService {
     
     private final CloudJobDisableListener cloudJobDisableListener;
     
+    private final CloudAppConfigurationListener cloudAppConfigurationListener;
+    
+    private final CloudAppDisableListener cloudAppDisableListener;
+    
     public SchedulerService(final CoordinatorRegistryCenter regCenter) {
         env = BootstrapEnvironment.getINSTANCE();
         facadeService = new FacadeService(regCenter);
@@ -76,6 +82,8 @@ public final class SchedulerService {
         producerManager = new ProducerManager(schedulerDriver, regCenter);
         cloudJobConfigurationListener = new 
CloudJobConfigurationListener(regCenter, producerManager);
         cloudJobDisableListener = new CloudJobDisableListener(regCenter, 
producerManager);
+        cloudAppConfigurationListener = new 
CloudAppConfigurationListener(regCenter, producerManager);
+        cloudAppDisableListener = new CloudAppDisableListener(regCenter, 
producerManager);
         taskLaunchScheduledService = new 
TaskLaunchScheduledService(schedulerDriver, taskScheduler, facadeService, 
jobEventBus);
         reconcileService = new ReconcileService(schedulerDriver, 
facadeService);
         consoleBootstrap = new ConsoleBootstrap(regCenter, 
env.getRestfulServerConfiguration(), producerManager, reconcileService);
@@ -120,7 +128,9 @@ public final class SchedulerService {
         producerManager.startup();
         statisticManager.startup();
         cloudJobConfigurationListener.start();
+        cloudAppConfigurationListener.start();
         cloudJobDisableListener.start();
+        cloudAppDisableListener.start();
         taskLaunchScheduledService.startAsync();
         consoleBootstrap.start();
         schedulerDriver.start();
@@ -136,7 +146,9 @@ public final class SchedulerService {
         consoleBootstrap.stop();
         taskLaunchScheduledService.stopAsync();
         cloudJobConfigurationListener.stop();
+        cloudAppConfigurationListener.stop();
         cloudJobDisableListener.stop();
+        cloudAppDisableListener.stop();
         statisticManager.shutdown();
         producerManager.shutdown();
         schedulerDriver.stop(true);
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListener.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListener.java
new file mode 100644
index 0000000..4610be7
--- /dev/null
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListener.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import 
org.apache.shardingsphere.elasticjob.cloud.config.pojo.CloudJobConfigurationPOJO;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+
+/**
+ * Cloud app disable listener.
+ */
+public final class CloudAppDisableListener implements CuratorCacheListener {
+    
+    private final CoordinatorRegistryCenter regCenter;
+    
+    private final ProducerManager producerManager;
+    
+    private final CloudJobConfigurationService jobConfigService;
+    
+    public CloudAppDisableListener(final CoordinatorRegistryCenter regCenter, 
final ProducerManager producerManager) {
+        this.regCenter = regCenter;
+        this.producerManager = producerManager;
+        jobConfigService = new CloudJobConfigurationService(regCenter);
+    }
+    
+    @Override
+    public void event(final Type type, final ChildData oldData, final 
ChildData data) {
+        String path = data.getPath();
+        if (Type.NODE_CREATED == type && isAppDisableNode(path)) {
+            String appName = path.substring(DisableAppNode.ROOT.length() + 1);
+            if (Objects.nonNull(appName)) {
+                disableApp(appName);
+            }
+        } else if (Type.NODE_DELETED == type && isAppDisableNode(path)) {
+            String appName = path.substring(DisableAppNode.ROOT.length() + 1);
+            if (Objects.nonNull(appName)) {
+                enableApp(appName);
+            }
+        }
+    }
+    
+    private boolean isAppDisableNode(final String path) {
+        return path.startsWith(DisableAppNode.ROOT) && path.length() > 
DisableAppNode.ROOT.length();
+    }
+    
+    /**
+     * Start the listener service of the cloud job service.
+     */
+    public void start() {
+        getCache().listenable().addListener(this, 
Executors.newSingleThreadExecutor());
+    }
+    
+    /**
+     * Stop the listener service of the cloud job service.
+     */
+    public void stop() {
+        getCache().listenable().removeListener(this);
+    }
+    
+    private CuratorCache getCache() {
+        CuratorCache result = (CuratorCache) 
regCenter.getRawCache(DisableAppNode.ROOT);
+        if (null != result) {
+            return result;
+        }
+        regCenter.addCacheData(DisableAppNode.ROOT);
+        return (CuratorCache) regCenter.getRawCache(DisableAppNode.ROOT);
+    }
+    
+    private void disableApp(final String appName) {
+        for (CloudJobConfigurationPOJO each : jobConfigService.loadAll()) {
+            if (appName.equals(each.getAppName())) {
+                producerManager.unschedule(each.getJobName());
+            }
+        }
+    }
+    
+    private void enableApp(final String appName) {
+        for (CloudJobConfigurationPOJO each : jobConfigService.loadAll()) {
+            if (appName.equals(each.getAppName())) {
+                producerManager.reschedule(each.getJobName());
+            }
+        }
+    }
+}
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListenerTest.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListenerTest.java
new file mode 100644
index 0000000..2167b0c
--- /dev/null
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListenerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.mesos.Protos;
+import org.apache.shardingsphere.elasticjob.cloud.ReflectionUtils;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListenerTest;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.EmbedTestingServer;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos.MesosStateService;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class CloudAppConfigurationListenerTest {
+    
+    private static ZookeeperRegistryCenter regCenter;
+    
+    @Mock
+    private ProducerManager producerManager;
+    
+    @Mock
+    private MesosStateService mesosStateService;
+    
+    @InjectMocks
+    private CloudAppConfigurationListener cloudAppConfigurationListener;
+    
+    @Before
+    public void setUp() {
+        ReflectionUtils.setFieldValue(cloudAppConfigurationListener, 
"producerManager", producerManager);
+        ReflectionUtils.setFieldValue(cloudAppConfigurationListener, 
"mesosStateService", mesosStateService);
+        initRegistryCenter();
+        ReflectionUtils.setFieldValue(cloudAppConfigurationListener, 
"regCenter", regCenter);
+    }
+    
+    private void initRegistryCenter() {
+        EmbedTestingServer.start();
+        ZookeeperConfiguration configuration = new 
ZookeeperConfiguration(EmbedTestingServer.getConnectionString(), 
CloudJobConfigurationListenerTest.class.getName());
+        configuration.setDigest("digest:password");
+        configuration.setSessionTimeoutMilliseconds(5000);
+        configuration.setConnectionTimeoutMilliseconds(5000);
+        regCenter = new ZookeeperRegistryCenter(configuration);
+        regCenter.init();
+    }
+    
+    @Test
+    public void assertRemoveWithInvalidPath() {
+        
cloudAppConfigurationListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/other/test_app", null, "".getBytes()));
+        verify(mesosStateService, times(0)).executors(ArgumentMatchers.any());
+        verify(producerManager, 
times(0)).sendFrameworkMessage(any(Protos.ExecutorID.class), 
any(Protos.SlaveID.class), any());
+    }
+    
+    @Test
+    public void assertRemoveWithNoAppNamePath() {
+        
cloudAppConfigurationListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/config/app", null, "".getBytes()));
+        verify(mesosStateService, times(0)).executors(ArgumentMatchers.any());
+        verify(producerManager, 
times(0)).sendFrameworkMessage(any(Protos.ExecutorID.class), 
any(Protos.SlaveID.class), any());
+    }
+    
+    @Test
+    public void assertRemoveApp() {
+        
cloudAppConfigurationListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/config/app/test_app", null, "".getBytes()));
+        verify(mesosStateService).executors("test_app");
+    }
+    
+    @Test
+    public void start() {
+        cloudAppConfigurationListener.start();
+    }
+    
+    @Test
+    public void stop() {
+        regCenter.addCacheData(CloudAppConfigurationNode.ROOT);
+        ReflectionUtils.setFieldValue(cloudAppConfigurationListener, 
"regCenter", regCenter);
+        cloudAppConfigurationListener.stop();
+    }
+}
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
index 345a578..adcf05c 100755
--- 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
@@ -20,10 +20,12 @@ package 
org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos;
 import com.google.common.util.concurrent.Service;
 import org.apache.mesos.SchedulerDriver;
 import org.apache.shardingsphere.elasticjob.cloud.console.ConsoleBootstrap;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfigurationListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.FrameworkConfiguration;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app.CloudAppDisableListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
 import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
 import org.junit.Before;
@@ -71,13 +73,20 @@ public class SchedulerServiceTest {
     @Mock
     private CloudJobDisableListener cloudJobDisableListener;
     
+    @Mock
+    private CloudAppConfigurationListener cloudAppConfigurationListener;
+    
+    @Mock
+    private CloudAppDisableListener cloudAppDisableListener;
+    
     private SchedulerService schedulerService;
     
     @Before
     public void setUp() {
         schedulerService = new SchedulerService(env, facadeService, 
schedulerDriver,
                 producerManager, statisticManager, 
cloudJobConfigurationListener,
-                taskLaunchScheduledService, consoleBootstrap, 
reconcileService, cloudJobDisableListener);
+                taskLaunchScheduledService, consoleBootstrap, 
reconcileService, cloudJobDisableListener,
+                cloudAppConfigurationListener, cloudAppDisableListener);
     }
     
     @Test
diff --git 
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListenerTest.java
 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListenerTest.java
new file mode 100644
index 0000000..7788cb6
--- /dev/null
+++ 
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListenerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.shardingsphere.elasticjob.cloud.ReflectionUtils;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListenerTest;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.EmbedTestingServer;
+import 
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import 
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class CloudAppDisableListenerTest {
+    
+    private static ZookeeperRegistryCenter regCenter;
+    
+    @Mock
+    private ProducerManager producerManager;
+    
+    @Mock
+    private CloudJobConfigurationService jobConfigService;
+    
+    @InjectMocks
+    private CloudAppDisableListener cloudAppDisableListener;
+    
+    @Before
+    public void setUp() {
+        ReflectionUtils.setFieldValue(cloudAppDisableListener, 
"producerManager", producerManager);
+        initRegistryCenter();
+        ReflectionUtils.setFieldValue(cloudAppDisableListener, "regCenter", 
regCenter);
+        ReflectionUtils.setFieldValue(cloudAppDisableListener, 
"jobConfigService", jobConfigService);
+    }
+    
+    private void initRegistryCenter() {
+        EmbedTestingServer.start();
+        ZookeeperConfiguration configuration = new 
ZookeeperConfiguration(EmbedTestingServer.getConnectionString(), 
CloudJobConfigurationListenerTest.class.getName());
+        configuration.setDigest("digest:password");
+        configuration.setSessionTimeoutMilliseconds(5000);
+        configuration.setConnectionTimeoutMilliseconds(5000);
+        regCenter = new ZookeeperRegistryCenter(configuration);
+        regCenter.init();
+    }
+    
+    @Test
+    public void assertDisableWithInvalidPath() {
+        cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_CREATED, 
null, new ChildData("/other/test_app", null, "".getBytes()));
+        verify(jobConfigService, times(0)).loadAll();
+        verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertDisableWithNoAppNamePath() {
+        cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_CREATED, 
null, new ChildData("/state/disable/app", null, "".getBytes()));
+        verify(jobConfigService, times(0)).loadAll();
+        verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertDisable() {
+        cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_CREATED, 
null, new ChildData("/state/disable/app/app_test", null, "".getBytes()));
+        verify(jobConfigService).loadAll();
+    }
+    
+    @Test
+    public void assertEnableWithInvalidPath() {
+        cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/other/test_app", null, "".getBytes()));
+        verify(jobConfigService, times(0)).loadAll();
+        verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertEnableWithNoAppNamePath() {
+        cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/state/disable/app", null, "".getBytes()));
+        verify(jobConfigService, times(0)).loadAll();
+        verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+    }
+    
+    @Test
+    public void assertEnable() {
+        cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_DELETED, 
null, new ChildData("/state/disable/app/app_test", null, "".getBytes()));
+        verify(jobConfigService).loadAll();
+    }
+    
+    @Test
+    public void start() {
+        cloudAppDisableListener.start();
+    }
+    
+    @Test
+    public void stop() {
+        regCenter.addCacheData("/state/disable/app");
+        ReflectionUtils.setFieldValue(cloudAppDisableListener, "regCenter", 
regCenter);
+        cloudAppDisableListener.stop();
+    }
+}

Reply via email to