This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new 6882df0 Add cloud app configuration & dissable listener (#1321)
6882df0 is described below
commit 6882df0662fd5ea961be50610242d91862158d75
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Aug 3 13:27:05 2020 +0800
Add cloud app configuration & dissable listener (#1321)
* Add cloud app configuration listener
* Rename test method name
* Rename test method name
---
.../config/app/CloudAppConfigurationListener.java | 100 +++++++++++++++++
.../cloud/scheduler/mesos/SchedulerService.java | 12 ++
.../state/disable/app/CloudAppDisableListener.java | 106 ++++++++++++++++++
.../app/CloudAppConfigurationListenerTest.java | 105 ++++++++++++++++++
.../scheduler/mesos/SchedulerServiceTest.java | 11 +-
.../disable/app/CloudAppDisableListenerTest.java | 123 +++++++++++++++++++++
6 files changed, 456 insertions(+), 1 deletion(-)
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListener.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListener.java
new file mode 100644
index 0000000..a501747
--- /dev/null
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListener.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app;
+
+import com.google.gson.JsonParseException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.mesos.Protos;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos.MesosStateService;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import org.apache.shardingsphere.elasticjob.infra.exception.JobSystemException;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.Collection;
+import java.util.concurrent.Executors;
+
+/**
+ * Cloud app configuration change listener.
+ */
+@Slf4j
+public final class CloudAppConfigurationListener implements
CuratorCacheListener {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ private final ProducerManager producerManager;
+
+ private MesosStateService mesosStateService;
+
+ public CloudAppConfigurationListener(final CoordinatorRegistryCenter
regCenter, final ProducerManager producerManager) {
+ this.regCenter = regCenter;
+ this.producerManager = producerManager;
+ mesosStateService = new MesosStateService(regCenter);
+ }
+
+ @Override
+
+ public void event(final Type type, final ChildData oldData, final
ChildData data) {
+ String path = data.getPath();
+ if (Type.NODE_DELETED == type && isJobAppConfigNode(path)) {
+ String appName =
path.substring(CloudAppConfigurationNode.ROOT.length() + 1);
+ stopExecutors(appName);
+ }
+ }
+
+ private boolean isJobAppConfigNode(final String path) {
+ return path.startsWith(CloudAppConfigurationNode.ROOT) &&
path.length() > CloudAppConfigurationNode.ROOT.length();
+ }
+
+ /**
+ * Start the listener service of the cloud job service.
+ */
+ public void start() {
+ getCache().listenable().addListener(this,
Executors.newSingleThreadExecutor());
+ }
+
+ /**
+ * Stop the listener service of the cloud job service.
+ */
+ public void stop() {
+ getCache().listenable().removeListener(this);
+ }
+
+ private CuratorCache getCache() {
+ CuratorCache result = (CuratorCache)
regCenter.getRawCache(CloudAppConfigurationNode.ROOT);
+ if (null != result) {
+ return result;
+ }
+ regCenter.addCacheData(CloudAppConfigurationNode.ROOT);
+ return (CuratorCache)
regCenter.getRawCache(CloudAppConfigurationNode.ROOT);
+ }
+
+ private void stopExecutors(final String appName) {
+ try {
+ Collection<MesosStateService.ExecutorStateInfo> executorBriefInfo
= mesosStateService.executors(appName);
+ for (MesosStateService.ExecutorStateInfo each : executorBriefInfo)
{
+
producerManager.sendFrameworkMessage(Protos.ExecutorID.newBuilder().setValue(each.getId()).build(),
+
Protos.SlaveID.newBuilder().setValue(each.getSlaveId()).build(),
"STOP".getBytes());
+ }
+ } catch (final JsonParseException ex) {
+ throw new JobSystemException(ex);
+ }
+ }
+}
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
index 2cf1092..aed52ad 100755
---
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerService.java
@@ -26,11 +26,13 @@ import org.apache.mesos.MesosSchedulerDriver;
import org.apache.mesos.Protos;
import org.apache.mesos.SchedulerDriver;
import org.apache.shardingsphere.elasticjob.cloud.console.ConsoleBootstrap;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfigurationListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.MesosConfiguration;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.ha.FrameworkIDService;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app.CloudAppDisableListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
@@ -66,6 +68,10 @@ public final class SchedulerService {
private final CloudJobDisableListener cloudJobDisableListener;
+ private final CloudAppConfigurationListener cloudAppConfigurationListener;
+
+ private final CloudAppDisableListener cloudAppDisableListener;
+
public SchedulerService(final CoordinatorRegistryCenter regCenter) {
env = BootstrapEnvironment.getINSTANCE();
facadeService = new FacadeService(regCenter);
@@ -76,6 +82,8 @@ public final class SchedulerService {
producerManager = new ProducerManager(schedulerDriver, regCenter);
cloudJobConfigurationListener = new
CloudJobConfigurationListener(regCenter, producerManager);
cloudJobDisableListener = new CloudJobDisableListener(regCenter,
producerManager);
+ cloudAppConfigurationListener = new
CloudAppConfigurationListener(regCenter, producerManager);
+ cloudAppDisableListener = new CloudAppDisableListener(regCenter,
producerManager);
taskLaunchScheduledService = new
TaskLaunchScheduledService(schedulerDriver, taskScheduler, facadeService,
jobEventBus);
reconcileService = new ReconcileService(schedulerDriver,
facadeService);
consoleBootstrap = new ConsoleBootstrap(regCenter,
env.getRestfulServerConfiguration(), producerManager, reconcileService);
@@ -120,7 +128,9 @@ public final class SchedulerService {
producerManager.startup();
statisticManager.startup();
cloudJobConfigurationListener.start();
+ cloudAppConfigurationListener.start();
cloudJobDisableListener.start();
+ cloudAppDisableListener.start();
taskLaunchScheduledService.startAsync();
consoleBootstrap.start();
schedulerDriver.start();
@@ -136,7 +146,9 @@ public final class SchedulerService {
consoleBootstrap.stop();
taskLaunchScheduledService.stopAsync();
cloudJobConfigurationListener.stop();
+ cloudAppConfigurationListener.stop();
cloudJobDisableListener.stop();
+ cloudAppDisableListener.stop();
statisticManager.shutdown();
producerManager.shutdown();
schedulerDriver.stop(true);
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListener.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListener.java
new file mode 100644
index 0000000..4610be7
--- /dev/null
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/main/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListener.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCache;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import
org.apache.shardingsphere.elasticjob.cloud.config.pojo.CloudJobConfigurationPOJO;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
+
+import java.util.Objects;
+import java.util.concurrent.Executors;
+
+/**
+ * Cloud app disable listener.
+ */
+public final class CloudAppDisableListener implements CuratorCacheListener {
+
+ private final CoordinatorRegistryCenter regCenter;
+
+ private final ProducerManager producerManager;
+
+ private final CloudJobConfigurationService jobConfigService;
+
+ public CloudAppDisableListener(final CoordinatorRegistryCenter regCenter,
final ProducerManager producerManager) {
+ this.regCenter = regCenter;
+ this.producerManager = producerManager;
+ jobConfigService = new CloudJobConfigurationService(regCenter);
+ }
+
+ @Override
+ public void event(final Type type, final ChildData oldData, final
ChildData data) {
+ String path = data.getPath();
+ if (Type.NODE_CREATED == type && isAppDisableNode(path)) {
+ String appName = path.substring(DisableAppNode.ROOT.length() + 1);
+ if (Objects.nonNull(appName)) {
+ disableApp(appName);
+ }
+ } else if (Type.NODE_DELETED == type && isAppDisableNode(path)) {
+ String appName = path.substring(DisableAppNode.ROOT.length() + 1);
+ if (Objects.nonNull(appName)) {
+ enableApp(appName);
+ }
+ }
+ }
+
+ private boolean isAppDisableNode(final String path) {
+ return path.startsWith(DisableAppNode.ROOT) && path.length() >
DisableAppNode.ROOT.length();
+ }
+
+ /**
+ * Start the listener service of the cloud job service.
+ */
+ public void start() {
+ getCache().listenable().addListener(this,
Executors.newSingleThreadExecutor());
+ }
+
+ /**
+ * Stop the listener service of the cloud job service.
+ */
+ public void stop() {
+ getCache().listenable().removeListener(this);
+ }
+
+ private CuratorCache getCache() {
+ CuratorCache result = (CuratorCache)
regCenter.getRawCache(DisableAppNode.ROOT);
+ if (null != result) {
+ return result;
+ }
+ regCenter.addCacheData(DisableAppNode.ROOT);
+ return (CuratorCache) regCenter.getRawCache(DisableAppNode.ROOT);
+ }
+
+ private void disableApp(final String appName) {
+ for (CloudJobConfigurationPOJO each : jobConfigService.loadAll()) {
+ if (appName.equals(each.getAppName())) {
+ producerManager.unschedule(each.getJobName());
+ }
+ }
+ }
+
+ private void enableApp(final String appName) {
+ for (CloudJobConfigurationPOJO each : jobConfigService.loadAll()) {
+ if (appName.equals(each.getAppName())) {
+ producerManager.reschedule(each.getJobName());
+ }
+ }
+ }
+}
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListenerTest.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListenerTest.java
new file mode 100644
index 0000000..2167b0c
--- /dev/null
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/config/app/CloudAppConfigurationListenerTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.mesos.Protos;
+import org.apache.shardingsphere.elasticjob.cloud.ReflectionUtils;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListenerTest;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.EmbedTestingServer;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos.MesosStateService;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class CloudAppConfigurationListenerTest {
+
+ private static ZookeeperRegistryCenter regCenter;
+
+ @Mock
+ private ProducerManager producerManager;
+
+ @Mock
+ private MesosStateService mesosStateService;
+
+ @InjectMocks
+ private CloudAppConfigurationListener cloudAppConfigurationListener;
+
+ @Before
+ public void setUp() {
+ ReflectionUtils.setFieldValue(cloudAppConfigurationListener,
"producerManager", producerManager);
+ ReflectionUtils.setFieldValue(cloudAppConfigurationListener,
"mesosStateService", mesosStateService);
+ initRegistryCenter();
+ ReflectionUtils.setFieldValue(cloudAppConfigurationListener,
"regCenter", regCenter);
+ }
+
+ private void initRegistryCenter() {
+ EmbedTestingServer.start();
+ ZookeeperConfiguration configuration = new
ZookeeperConfiguration(EmbedTestingServer.getConnectionString(),
CloudJobConfigurationListenerTest.class.getName());
+ configuration.setDigest("digest:password");
+ configuration.setSessionTimeoutMilliseconds(5000);
+ configuration.setConnectionTimeoutMilliseconds(5000);
+ regCenter = new ZookeeperRegistryCenter(configuration);
+ regCenter.init();
+ }
+
+ @Test
+ public void assertRemoveWithInvalidPath() {
+
cloudAppConfigurationListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/other/test_app", null, "".getBytes()));
+ verify(mesosStateService, times(0)).executors(ArgumentMatchers.any());
+ verify(producerManager,
times(0)).sendFrameworkMessage(any(Protos.ExecutorID.class),
any(Protos.SlaveID.class), any());
+ }
+
+ @Test
+ public void assertRemoveWithNoAppNamePath() {
+
cloudAppConfigurationListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/config/app", null, "".getBytes()));
+ verify(mesosStateService, times(0)).executors(ArgumentMatchers.any());
+ verify(producerManager,
times(0)).sendFrameworkMessage(any(Protos.ExecutorID.class),
any(Protos.SlaveID.class), any());
+ }
+
+ @Test
+ public void assertRemoveApp() {
+
cloudAppConfigurationListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/config/app/test_app", null, "".getBytes()));
+ verify(mesosStateService).executors("test_app");
+ }
+
+ @Test
+ public void start() {
+ cloudAppConfigurationListener.start();
+ }
+
+ @Test
+ public void stop() {
+ regCenter.addCacheData(CloudAppConfigurationNode.ROOT);
+ ReflectionUtils.setFieldValue(cloudAppConfigurationListener,
"regCenter", regCenter);
+ cloudAppConfigurationListener.stop();
+ }
+}
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
index 345a578..adcf05c 100755
---
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/mesos/SchedulerServiceTest.java
@@ -20,10 +20,12 @@ package
org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos;
import com.google.common.util.concurrent.Service;
import org.apache.mesos.SchedulerDriver;
import org.apache.shardingsphere.elasticjob.cloud.console.ConsoleBootstrap;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfigurationListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.env.FrameworkConfiguration;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app.CloudAppDisableListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.CloudJobDisableListener;
import
org.apache.shardingsphere.elasticjob.cloud.scheduler.statistics.StatisticManager;
import org.junit.Before;
@@ -71,13 +73,20 @@ public class SchedulerServiceTest {
@Mock
private CloudJobDisableListener cloudJobDisableListener;
+ @Mock
+ private CloudAppConfigurationListener cloudAppConfigurationListener;
+
+ @Mock
+ private CloudAppDisableListener cloudAppDisableListener;
+
private SchedulerService schedulerService;
@Before
public void setUp() {
schedulerService = new SchedulerService(env, facadeService,
schedulerDriver,
producerManager, statisticManager,
cloudJobConfigurationListener,
- taskLaunchScheduledService, consoleBootstrap,
reconcileService, cloudJobDisableListener);
+ taskLaunchScheduledService, consoleBootstrap,
reconcileService, cloudJobDisableListener,
+ cloudAppConfigurationListener, cloudAppDisableListener);
}
@Test
diff --git
a/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListenerTest.java
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListenerTest.java
new file mode 100644
index 0000000..7788cb6
--- /dev/null
+++
b/elasticjob-cloud/elasticjob-cloud-scheduler/src/test/java/org/apache/shardingsphere/elasticjob/cloud/scheduler/state/disable/app/CloudAppDisableListenerTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app;
+
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
+import org.apache.shardingsphere.elasticjob.cloud.ReflectionUtils;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationListenerTest;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.fixture.EmbedTestingServer;
+import
org.apache.shardingsphere.elasticjob.cloud.scheduler.producer.ProducerManager;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
+import
org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class CloudAppDisableListenerTest {
+
+ private static ZookeeperRegistryCenter regCenter;
+
+ @Mock
+ private ProducerManager producerManager;
+
+ @Mock
+ private CloudJobConfigurationService jobConfigService;
+
+ @InjectMocks
+ private CloudAppDisableListener cloudAppDisableListener;
+
+ @Before
+ public void setUp() {
+ ReflectionUtils.setFieldValue(cloudAppDisableListener,
"producerManager", producerManager);
+ initRegistryCenter();
+ ReflectionUtils.setFieldValue(cloudAppDisableListener, "regCenter",
regCenter);
+ ReflectionUtils.setFieldValue(cloudAppDisableListener,
"jobConfigService", jobConfigService);
+ }
+
+ private void initRegistryCenter() {
+ EmbedTestingServer.start();
+ ZookeeperConfiguration configuration = new
ZookeeperConfiguration(EmbedTestingServer.getConnectionString(),
CloudJobConfigurationListenerTest.class.getName());
+ configuration.setDigest("digest:password");
+ configuration.setSessionTimeoutMilliseconds(5000);
+ configuration.setConnectionTimeoutMilliseconds(5000);
+ regCenter = new ZookeeperRegistryCenter(configuration);
+ regCenter.init();
+ }
+
+ @Test
+ public void assertDisableWithInvalidPath() {
+ cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_CREATED,
null, new ChildData("/other/test_app", null, "".getBytes()));
+ verify(jobConfigService, times(0)).loadAll();
+ verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertDisableWithNoAppNamePath() {
+ cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_CREATED,
null, new ChildData("/state/disable/app", null, "".getBytes()));
+ verify(jobConfigService, times(0)).loadAll();
+ verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertDisable() {
+ cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_CREATED,
null, new ChildData("/state/disable/app/app_test", null, "".getBytes()));
+ verify(jobConfigService).loadAll();
+ }
+
+ @Test
+ public void assertEnableWithInvalidPath() {
+ cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/other/test_app", null, "".getBytes()));
+ verify(jobConfigService, times(0)).loadAll();
+ verify(producerManager, times(0)).reschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertEnableWithNoAppNamePath() {
+ cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/state/disable/app", null, "".getBytes()));
+ verify(jobConfigService, times(0)).loadAll();
+ verify(producerManager, times(0)).unschedule(ArgumentMatchers.any());
+ }
+
+ @Test
+ public void assertEnable() {
+ cloudAppDisableListener.event(CuratorCacheListener.Type.NODE_DELETED,
null, new ChildData("/state/disable/app/app_test", null, "".getBytes()));
+ verify(jobConfigService).loadAll();
+ }
+
+ @Test
+ public void start() {
+ cloudAppDisableListener.start();
+ }
+
+ @Test
+ public void stop() {
+ regCenter.addCacheData("/state/disable/app");
+ ReflectionUtils.setFieldValue(cloudAppDisableListener, "regCenter",
regCenter);
+ cloudAppDisableListener.stop();
+ }
+}