This is an automated email from the ASF dual-hosted git repository. wujimin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-java-chassis.git
commit 082b0dccfd5d1e2194d9c70c99b9fcb52e7e9b20 Author: wujimin <wuji...@huawei.com> AuthorDate: Mon Jul 16 20:38:36 2018 +0800 [SCB-729] instance cache check task can be trigger periodically or manually --- .../diagnosis/instance/InstanceCacheCheckTask.java | 139 +++++++++++++++++++ ...egistry.registry.ServiceRegistryTaskInitializer | 18 +++ .../instance/TestInstanceCacheCheckTask.java | 151 +++++++++++++++++++++ 3 files changed, 308 insertions(+) diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/diagnosis/instance/InstanceCacheCheckTask.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/diagnosis/instance/InstanceCacheCheckTask.java new file mode 100644 index 0000000..b6dd615 --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/diagnosis/instance/InstanceCacheCheckTask.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.serviceregistry.diagnosis.instance; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.servicecomb.serviceregistry.consumer.AppManager; +import org.apache.servicecomb.serviceregistry.registry.RemoteServiceRegistry; +import org.apache.servicecomb.serviceregistry.registry.ServiceRegistryTaskInitializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.eventbus.EventBus; +import com.netflix.config.DynamicIntProperty; +import com.netflix.config.DynamicPropertyFactory; +import com.netflix.config.DynamicStringProperty; + +import io.vertx.core.json.Json; + +public class InstanceCacheCheckTask implements ServiceRegistryTaskInitializer { + private static final Logger LOGGER = LoggerFactory.getLogger(InstanceCacheCheckTask.class); + + private static final int DEFAULT_DIAGNOSE_INSTANCE_CACHE_INTERVAL_IN_HOUR = 24; + + private static final String CONFIG_PREFIX = "servicecomb.service.registry.instance.diagnose."; + + public static final String MANUAL = CONFIG_PREFIX + "manual"; + + public static final String AUTO_INTERVAL = CONFIG_PREFIX + "interval"; + + // auto task + private ScheduledFuture<?> scheduledFuture; + + private AppManager appManager; + + private ScheduledThreadPoolExecutor taskPool; + + private EventBus eventBus; + + private DynamicIntProperty autoCheckIntervalProperty; + + private DynamicStringProperty manualCheckProperty; + + private TimeUnit timeUnit = TimeUnit.HOURS; + + // make test easier + public void setTimeUnit(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } + + public void setAppManager(AppManager appManager) { + this.appManager = appManager; + } + + public void setTaskPool(ScheduledThreadPoolExecutor taskPool) { + this.taskPool = taskPool; + } + + public void setEventBus(EventBus eventBus) { + this.eventBus = eventBus; + } + + public DynamicStringProperty getManualCheckProperty() { + return manualCheckProperty; + } + + public DynamicIntProperty getAutoCheckIntervalProperty() { + return autoCheckIntervalProperty; + } + + @Override + public void init(RemoteServiceRegistry remoteServiceRegistry) { + appManager = remoteServiceRegistry.getAppManager(); + taskPool = remoteServiceRegistry.getTaskPool(); + eventBus = remoteServiceRegistry.getEventBus(); + + init(); + } + + protected void init() { + startAutoTask(); + registerManualTask(); + } + + private void registerManualTask() { + // if manual config item changed, then run task once + manualCheckProperty = DynamicPropertyFactory.getInstance().getStringProperty(MANUAL, null, this::runTask); + } + + protected void startAutoTask() { + autoCheckIntervalProperty = DynamicPropertyFactory.getInstance().getIntProperty(AUTO_INTERVAL, + DEFAULT_DIAGNOSE_INSTANCE_CACHE_INTERVAL_IN_HOUR, + this::doStartAutoTask); + doStartAutoTask(); + } + + private void doStartAutoTask() { + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + scheduledFuture = null; + } + + int interval = autoCheckIntervalProperty.get(); + if (interval <= 0) { + LOGGER.info("disable instance cache check task, interval={}.", interval); + return; + } + + scheduledFuture = taskPool.scheduleAtFixedRate(this::runTask, interval, interval, timeUnit); + } + + protected void runTask() { + try { + InstanceCacheChecker checker = new InstanceCacheChecker(appManager); + InstanceCacheSummary instanceCacheSummary = checker.check(); + eventBus.post(instanceCacheSummary); + + LOGGER.info("check instance cache, result={}.", Json.encode(instanceCacheSummary)); + } catch (Throwable e) { + LOGGER.error("failed check instance cache..", e); + } + } +} diff --git a/service-registry/src/main/resources/META-INF/services/org.apache.servicecomb.serviceregistry.registry.ServiceRegistryTaskInitializer b/service-registry/src/main/resources/META-INF/services/org.apache.servicecomb.serviceregistry.registry.ServiceRegistryTaskInitializer new file mode 100644 index 0000000..761ee02 --- /dev/null +++ b/service-registry/src/main/resources/META-INF/services/org.apache.servicecomb.serviceregistry.registry.ServiceRegistryTaskInitializer @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.servicecomb.serviceregistry.diagnosis.instance.InstanceCacheCheckTask diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/diagnosis/instance/TestInstanceCacheCheckTask.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/diagnosis/instance/TestInstanceCacheCheckTask.java new file mode 100644 index 0000000..862cf6f --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/diagnosis/instance/TestInstanceCacheCheckTask.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.servicecomb.serviceregistry.diagnosis.instance; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.xml.ws.Holder; + +import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils; +import org.apache.servicecomb.serviceregistry.consumer.AppManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.eventbus.EventBus; +import com.google.common.eventbus.Subscribe; + +import io.vertx.core.json.Json; +import mockit.Deencapsulation; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; + +public class TestInstanceCacheCheckTask { + @Mocked + AppManager appManager; + + ScheduledThreadPoolExecutor taskPool = new ScheduledThreadPoolExecutor(2, + task -> new Thread(task, "Service Center Task test thread"), + (task, executor) -> System.out.println("Too many pending tasks, reject " + task.getClass().getName())); + + EventBus eventBus = new EventBus(); + + InstanceCacheCheckTask task = new InstanceCacheCheckTask(); + + InstanceCacheSummary result; + + @Before + public void setUp() { + task.setAppManager(appManager); + task.setTaskPool(taskPool); + task.setEventBus(eventBus); + task.setTimeUnit(TimeUnit.MILLISECONDS); + + new MockUp<InstanceCacheChecker>() { + @Mock + InstanceCacheSummary check() { + return new InstanceCacheSummary(); + } + }; + } + + @After + public void tearDown() throws Exception { + ArchaiusUtils.resetConfig(); + taskPool.shutdownNow(); + } + + @Test + public void manualTask() throws InterruptedException { + + ArchaiusUtils.setProperty(InstanceCacheCheckTask.AUTO_INTERVAL, 0); + CountDownLatch latch = new CountDownLatch(1); + eventBus.register(new Object() { + @Subscribe + public void onChecked(InstanceCacheSummary instanceCacheSummary) { + result = instanceCacheSummary; + latch.countDown(); + } + }); + task.init(); + + ArchaiusUtils.setProperty(InstanceCacheCheckTask.MANUAL, UUID.randomUUID().toString()); + latch.await(); + + Assert.assertEquals("{\"status\":null,\"producers\":[],\"timestamp\":0}", Json.encode(result)); + } + + @Test + public void autoTask_normal() throws InterruptedException { + ArchaiusUtils.setProperty(InstanceCacheCheckTask.AUTO_INTERVAL, 1); + CountDownLatch latch = new CountDownLatch(1); + eventBus.register(new Object() { + @Subscribe + public void onChecked(InstanceCacheSummary instanceCacheSummary) { + result = instanceCacheSummary; + ((ScheduledFuture<?>) Deencapsulation.getField(task, "scheduledFuture")).cancel(false); + latch.countDown(); + } + }); + task.init(); + + latch.await(); + Assert.assertNotNull(Deencapsulation.getField(task, "scheduledFuture")); + Assert.assertEquals("{\"status\":null,\"producers\":[],\"timestamp\":0}", Json.encode(result)); + } + + @Test + public void autoTask_clearOldTask() { + Holder<Boolean> cancelResult = new Holder<>(); + ScheduledFuture<?> scheduledFuture = new MockUp<ScheduledFuture>() { + @Mock + boolean cancel(boolean mayInterruptIfRunning) { + cancelResult.value = true; + return true; + } + }.getMockInstance(); + + ArchaiusUtils.setProperty(InstanceCacheCheckTask.AUTO_INTERVAL, 0); + Deencapsulation.setField(task, "scheduledFuture", scheduledFuture); + task.init(); + + Assert.assertNull(Deencapsulation.getField(task, "scheduledFuture")); + Assert.assertTrue(cancelResult.value); + } + + @Test + public void autoTask_invalidIntervalZero() { + ArchaiusUtils.setProperty(InstanceCacheCheckTask.AUTO_INTERVAL, 0); + task.init(); + + Assert.assertNull(Deencapsulation.getField(task, "scheduledFuture")); + } + + @Test + public void autoTask_invalidIntervalLessThanZero() { + ArchaiusUtils.setProperty(InstanceCacheCheckTask.AUTO_INTERVAL, -1); + task.init(); + + Assert.assertNull(Deencapsulation.getField(task, "scheduledFuture")); + } +}