This is an automated email from the ASF dual-hosted git repository. yaohaishi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit e5d89b5e64bfe6f85199efcb3f7a3d452e50c14e Author: yhs0092 <[email protected]> AuthorDate: Sat Jan 18 15:17:42 2020 +0800 [SCB-1691] add ServiceRegistryCache --- .../serviceregistry/ServiceRegistry.java | 4 + .../registry/AbstractServiceRegistry.java | 8 + .../registry/cache/AggregateMicroserviceCache.java | 139 ++++++++ .../cache/AggregateServiceRegistryCache.java | 99 ++++++ .../registry/cache/MicroserviceCache.java | 68 ++++ .../registry/cache/MicroserviceCacheKey.java | 122 +++++++ .../cache/MicroserviceCacheRefreshedEvent.java | 32 ++ .../cache/RefreshableMicroserviceCache.java | 248 ++++++++++++++ .../cache/RefreshableServiceRegistryCache.java | 175 ++++++++++ .../registry/cache/ServiceRegistryCache.java | 30 ++ .../registry/EmptyMockServiceRegistry.java | 139 ++++++++ .../cache/AggregateMicroserviceCacheTest.java | 159 +++++++++ .../cache/AggregateServiceRegistryCacheTest.java | 212 ++++++++++++ .../registry/cache/MicroserviceCacheKeyTest.java | 88 +++++ .../registry/cache/MockedMicroserviceCache.java | 49 +++ .../cache/RefreshableMicroserviceCacheTest.java | 367 +++++++++++++++++++++ .../cache/RefreshableServiceRegistryCacheTest.java | 205 ++++++++++++ 17 files changed, 2144 insertions(+) diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java index 0e6e1a2..2de3e83 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java @@ -25,6 +25,8 @@ import org.apache.servicecomb.serviceregistry.api.registry.Microservice; import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey; import com.google.common.eventbus.EventBus; @@ -72,6 +74,8 @@ public interface ServiceRegistry { MicroserviceInstances findServiceInstances(String appId, String microserviceName, String microserviceVersionRule, String revision); + MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey); + boolean updateMicroserviceProperties(Map<String, String> properties); /** diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java index baf25dc..a37fdbc 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java @@ -46,6 +46,8 @@ import org.apache.servicecomb.serviceregistry.consumer.MicroserviceManager; import org.apache.servicecomb.serviceregistry.consumer.StaticMicroserviceVersions; import org.apache.servicecomb.serviceregistry.definition.MicroserviceDefinition; import org.apache.servicecomb.serviceregistry.definition.MicroserviceNameParser; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey; import org.apache.servicecomb.serviceregistry.task.MicroserviceServiceCenterTask; import org.apache.servicecomb.serviceregistry.task.ServiceCenterTask; import org.apache.servicecomb.serviceregistry.task.event.RecoveryEvent; @@ -246,6 +248,12 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry { } @Override + public MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) { + // TODO find MicroserviceCache from ServiceRegistryCache + return null; + } + + @Override public boolean updateMicroserviceProperties(Map<String, String> properties) { boolean success = srClient.updateMicroserviceProperties(microservice.getServiceId(), properties); diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateMicroserviceCache.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateMicroserviceCache.java new file mode 100644 index 0000000..a0082f7 --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateMicroserviceCache.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.servicecomb.serviceregistry.ServiceRegistry; +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; + +public class AggregateMicroserviceCache implements MicroserviceCache { + private MicroserviceCacheKey key; + + Map<String, MicroserviceCache> caches; + + AtomicLong revisionCounter = new AtomicLong(); + + private String revisionId = revisionCounter.toString(); + + private MicroserviceCacheStatus status = MicroserviceCacheStatus.INIT; + + private List<MicroserviceInstance> instances = new ArrayList<>(); + + Collection<ServiceRegistry> serviceRegistries; + + private final Object refreshLock = new Object(); + + public AggregateMicroserviceCache(MicroserviceCacheKey key, Collection<ServiceRegistry> serviceRegistries) { + this.key = key; + this.serviceRegistries = serviceRegistries; + + refresh(); + } + + @Override + public void refresh() { + refreshInnerState(false); + } + + private void refreshInnerState(boolean b) { + synchronized (refreshLock) { + fillInMicroserviceCaches(b); + fillInInstanceList(); + updateRevisionId(); + refreshStatus(); + } + } + + @Override + public void forceRefresh() { + refreshInnerState(true); + } + + private void fillInMicroserviceCaches(boolean isForce) { + HashMap<String, MicroserviceCache> cacheMap = new LinkedHashMap<>(); + for (ServiceRegistry serviceRegistry : serviceRegistries) { + MicroserviceCache microserviceCache = serviceRegistry.findMicroserviceCache(key); + if (!isValidMicroserviceCache(microserviceCache)) { + continue; + } + if (isForce) { + microserviceCache.forceRefresh(); + } + cacheMap.put(serviceRegistry.getName(), microserviceCache); + } + caches = cacheMap; + } + + private void fillInInstanceList() { + ArrayList<MicroserviceInstance> instances = new ArrayList<>(); + for (Entry<String, MicroserviceCache> stringMicroserviceCacheEntry : caches.entrySet()) { + instances.addAll(stringMicroserviceCacheEntry.getValue().getInstances()); + } + this.instances = Collections.unmodifiableList(instances); + } + + private void updateRevisionId() { + revisionCounter.incrementAndGet(); + revisionId = revisionCounter.toString(); + } + + private void refreshStatus() { + if (caches.size() == 0) { + status = MicroserviceCacheStatus.SERVICE_NOT_FOUND; + } else { + status = MicroserviceCacheStatus.REFRESHED; + } + } + + private boolean isValidMicroserviceCache(MicroserviceCache microserviceCache) { + return !( + Objects.isNull(microserviceCache) + || MicroserviceCacheStatus.SERVICE_NOT_FOUND.equals(microserviceCache.getStatus()) + ); + } + + @Override + public MicroserviceCacheKey getKey() { + return key; + } + + @Override + public List<MicroserviceInstance> getInstances() { + return instances; + } + + @Override + public String getRevisionId() { + return revisionId; + } + + @Override + public MicroserviceCacheStatus getStatus() { + return status; + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateServiceRegistryCache.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateServiceRegistryCache.java new file mode 100644 index 0000000..687cccc --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateServiceRegistryCache.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; +import org.apache.servicecomb.serviceregistry.ServiceRegistry; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.eventbus.Subscribe; + +public class AggregateServiceRegistryCache implements ServiceRegistryCache { + + private static final Logger LOGGER = LoggerFactory.getLogger(AggregateServiceRegistryCache.class); + + Collection<ServiceRegistry> serviceRegistries; + + final Map<MicroserviceCacheKey, AggregateMicroserviceCache> microserviceCache = new ConcurrentHashMapEx<>(); + + private Consumer<List<MicroserviceCache>> cacheRefreshedWatcher; + + public AggregateServiceRegistryCache(Collection<ServiceRegistry> serviceRegistries) { + this.serviceRegistries = serviceRegistries; + } + + @Override + public MicroserviceCache findServiceCache(MicroserviceCacheKey microserviceCacheKey) { + AggregateMicroserviceCache microserviceCache = this.microserviceCache.computeIfAbsent(microserviceCacheKey, + key -> new AggregateMicroserviceCache(key, serviceRegistries)); + removeMicroserviceCacheIfNotExist(microserviceCache); + return microserviceCache; + } + + @Override + public ServiceRegistryCache setCacheRefreshedWatcher(Consumer<List<MicroserviceCache>> cacheRefreshedWatcher) { + this.cacheRefreshedWatcher = cacheRefreshedWatcher; + return this; + } + + @Subscribe + public void onMicroserviceCacheRefreshed(MicroserviceCacheRefreshedEvent event) { + List<MicroserviceCache> microserviceCaches = event.getMicroserviceCaches(); + if (null == microserviceCaches || microserviceCaches.isEmpty()) { + return; + } + + List<MicroserviceCache> refreshedAggregateMicroserviceCaches = microserviceCaches.stream() + .map(cache -> this.microserviceCache.get(cache.getKey())) + .filter(Objects::nonNull) + .peek(AggregateMicroserviceCache::refresh) + .peek(this::removeMicroserviceCacheIfNotExist) + .collect(Collectors.toList()); + + LOGGER.info("[{}] caches get refreshed", refreshedAggregateMicroserviceCaches.size()); + refreshedAggregateMicroserviceCaches.forEach(cache -> { + LOGGER.info("[{}]: status={}, revisionId={}", cache.getKey(), cache.getStatus(), cache.getRevisionId()); + }); + + if (null != cacheRefreshedWatcher) { + cacheRefreshedWatcher.accept(refreshedAggregateMicroserviceCaches); + } + } + + private void removeMicroserviceCacheIfNotExist(MicroserviceCache cache) { + if (MicroserviceCacheStatus.SERVICE_NOT_FOUND.equals(cache.getStatus())) { + microserviceCache.remove(cache.getKey()); + LOGGER.info("microserviceCache[{}] got removed", cache.getKey()); + } + } + + @Override + public Map<MicroserviceCacheKey, MicroserviceCache> getMicroserviceCaches() { + return Collections.unmodifiableMap(microserviceCache); + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCache.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCache.java new file mode 100644 index 0000000..d6a49c2 --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCache.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.List; + +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; + +public interface MicroserviceCache { + MicroserviceCacheKey getKey(); + + List<MicroserviceInstance> getInstances(); + + String getRevisionId(); + + MicroserviceCacheStatus getStatus(); + + void refresh(); + + void forceRefresh(); + + enum MicroserviceCacheStatus { + /** + * init status, not pull instances from sc yet + */ + INIT, + /** + * unknown error + */ + UNKNOWN_ERROR, + /** + * error occurs while getting access to service center + */ + CLIENT_ERROR, + /** + * success to query the service center, but no target microservice found + */ + SERVICE_NOT_FOUND, + /** + * success to query the service center, but the target microservice instance list is not changed + */ + NO_CHANGE, + /** + * success to query the service center, and the target microservice instance list is changed. + * the cached instance list gets refreshed successfully. + */ + REFRESHED, + /** + * unknown error occurs while setting the pulled instances into this cache + */ + SETTING_CACHE_ERROR + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheKey.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheKey.java new file mode 100644 index 0000000..103df5a --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheKey.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.Objects; + +import org.apache.servicecomb.serviceregistry.definition.DefinitionConst; +import org.apache.servicecomb.serviceregistry.definition.MicroserviceNameParser; + +public class MicroserviceCacheKey { + private String env; + + private String appId; + + private String serviceName; + + private static final String VERSION_RULE = DefinitionConst.VERSION_RULE_ALL; + + public static MicroserviceCacheKeyBuilder builder() { + return new MicroserviceCacheKeyBuilder(); + } + + MicroserviceCacheKey() { + } + + public void validate() { + Objects.requireNonNull(this.env, "microserviceCacheKey.env is null"); + Objects.requireNonNull(this.appId, "microserviceCacheKey.appId is null"); + Objects.requireNonNull(this.serviceName, "microserviceCacheKey.serviceName is null"); + } + + public String getEnv() { + return env; + } + + public String getAppId() { + return appId; + } + + public String getServiceName() { + return serviceName; + } + + public String getVersionRule() { + return VERSION_RULE; + } + + public String plainKey() { + return serviceName + "@" + appId + "@" + env; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + MicroserviceCacheKey that = (MicroserviceCacheKey) o; + return Objects.equals(env, that.env) && + Objects.equals(appId, that.appId) && + Objects.equals(serviceName, that.serviceName); + } + + @Override + public int hashCode() { + return Objects.hash(env, appId, serviceName); + } + + @Override + public String toString() { + return plainKey(); + } + + public static class MicroserviceCacheKeyBuilder { + private MicroserviceCacheKey microserviceCacheKey; + + public MicroserviceCacheKey build() { + microserviceCacheKey.validate(); + MicroserviceNameParser microserviceNameParser = + new MicroserviceNameParser(microserviceCacheKey.appId, microserviceCacheKey.serviceName); + microserviceCacheKey.appId = microserviceNameParser.getAppId(); + microserviceCacheKey.serviceName = microserviceNameParser.getShortName(); + return microserviceCacheKey; + } + + public MicroserviceCacheKeyBuilder env(String env) { + microserviceCacheKey.env = env; + return this; + } + + public MicroserviceCacheKeyBuilder appId(String appId) { + microserviceCacheKey.appId = appId; + return this; + } + + public MicroserviceCacheKeyBuilder serviceName(String serviceName) { + microserviceCacheKey.serviceName = serviceName; + return this; + } + + MicroserviceCacheKeyBuilder() { + microserviceCacheKey = new MicroserviceCacheKey(); + } + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheRefreshedEvent.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheRefreshedEvent.java new file mode 100644 index 0000000..2bfea47 --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheRefreshedEvent.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.List; + +public class MicroserviceCacheRefreshedEvent { + private final List<MicroserviceCache> microserviceCaches; + + public MicroserviceCacheRefreshedEvent(List<MicroserviceCache> microserviceCaches) { + this.microserviceCaches = microserviceCaches; + } + + public List<MicroserviceCache> getMicroserviceCaches() { + return microserviceCaches; + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableMicroserviceCache.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableMicroserviceCache.java new file mode 100644 index 0000000..9a74cf0 --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableMicroserviceCache.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils; +import org.apache.servicecomb.serviceregistry.api.Const; +import org.apache.servicecomb.serviceregistry.api.registry.Microservice; +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; +import org.apache.servicecomb.serviceregistry.api.response.MicroserviceInstanceChangedEvent; +import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; +import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances; +import org.apache.servicecomb.serviceregistry.consumer.MicroserviceInstancePing; +import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RefreshableMicroserviceCache implements MicroserviceCache { + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshableMicroserviceCache.class); + + MicroserviceCacheKey key; + + List<MicroserviceInstance> instances = Collections.unmodifiableList(new ArrayList<>()); + + Microservice consumerService; + + String revisionId; + + ServiceRegistryClient srClient; + + boolean safeMode; + + MicroserviceCacheStatus status = MicroserviceCacheStatus.INIT; + + private final Object SET_OPERATION_LOCK = new Object(); + + boolean emptyInstanceProtectionEnabled; + + MicroserviceInstancePing instancePing = SPIServiceUtils.getPriorityHighestService(MicroserviceInstancePing.class); + + RefreshableMicroserviceCache(Microservice consumerService, MicroserviceCacheKey key, ServiceRegistryClient srClient, + boolean emptyInstanceProtectionEnabled) { + this.key = key; + this.consumerService = consumerService; + this.srClient = srClient; + this.emptyInstanceProtectionEnabled = emptyInstanceProtectionEnabled; + } + + @Override + public void refresh() { + safePullInstance(revisionId); + } + + @Override + public void forceRefresh() { + safePullInstance(null); + } + + void safePullInstance(String revisionId) { + try { + pullInstance(revisionId); + } catch (Throwable e) { + LOGGER.error("unknown error occurs while pulling instances", e); + setStatus(MicroserviceCacheStatus.UNKNOWN_ERROR); + } + } + + void pullInstance(String revisionId) { + MicroserviceInstances serviceInstances = pullInstanceFromServiceCenter(revisionId); + + if (serviceInstances == null) { + LOGGER.error("Can not find any instances from service center due to previous errors. service={}/{}/{}", + key.getAppId(), + key.getServiceName(), + key.getVersionRule()); + setStatus(MicroserviceCacheStatus.CLIENT_ERROR); + return; + } + + if (serviceInstances.isMicroserviceNotExist()) { + setStatus(MicroserviceCacheStatus.SERVICE_NOT_FOUND); + return; + } + + if (!serviceInstances.isNeedRefresh()) { + LOGGER.debug("instances revision is not changed, service={}/{}/{}", key.getAppId(), key.getServiceName(), + key.getVersionRule()); + setStatus(MicroserviceCacheStatus.NO_CHANGE); + return; + } + + List<MicroserviceInstance> instances = serviceInstances.getInstancesResponse().getInstances(); + LOGGER.info("find instances[{}] from service center success. service={}/{}/{}, old revision={}, new revision={}", + instances.size(), + key.getAppId(), + key.getServiceName(), + key.getVersionRule(), + this.revisionId, + serviceInstances.getRevision()); + for (MicroserviceInstance instance : instances) { + LOGGER.info("service id={}, instance id={}, endpoints={}", + instance.getServiceId(), + instance.getInstanceId(), + instance.getEndpoints()); + } + safeSetInstances(instances, serviceInstances.getRevision()); + } + + MicroserviceInstances pullInstanceFromServiceCenter(String revisionId) { + return srClient.findServiceInstances(consumerService.getServiceId(), + key.getAppId(), key.getServiceName(), key.getVersionRule(), revisionId); + } + + private void safeSetInstances(List<MicroserviceInstance> pulledInstances, String rev) { + try { + synchronized (SET_OPERATION_LOCK) { + setInstances(pulledInstances, rev); + setStatus(MicroserviceCacheStatus.REFRESHED); + } + } catch (Throwable e) { + setStatus(MicroserviceCacheStatus.SETTING_CACHE_ERROR); + LOGGER.error("Failed to setInstances, appId={}, microserviceName={}.", + key.getAppId(), + key.getServiceName(), + e); + } + } + + private void setInstances(List<MicroserviceInstance> pulledInstances, String rev) { + Set<MicroserviceInstance> mergedInstances = mergeInstances(pulledInstances); + LOGGER.debug("actually set instances[{}] for {}", mergedInstances.size(), key.plainKey()); + for (MicroserviceInstance mergedInstance : mergedInstances) { + LOGGER.debug("serviceId={}, instanceId={}, endpoints={}", + mergedInstance.getServiceId(), + mergedInstance.getInstanceId(), + mergedInstance.getEndpoints()); + } + instances = Collections.unmodifiableList(new ArrayList<>(mergedInstances)); + revisionId = rev; + } + + protected Set<MicroserviceInstance> mergeInstances(List<MicroserviceInstance> pulledInstances) { + Set<MicroserviceInstance> mergedInstances = new LinkedHashSet<>(pulledInstances); + + if (safeMode) { + // in safe mode, instances will never be deleted + mergedInstances.addAll(instances); + return mergedInstances; + } + + if (!inEmptyPulledInstancesProtectionSituation(pulledInstances)) { + return mergedInstances; + } + + if (null == instancePing) { + LOGGER.info("no MicroserviceInstancePing implementation loaded, abandon the old instance list"); + return mergedInstances; + } + + instances.forEach(instance -> { + if (!mergedInstances.contains(instance)) { + if (instancePing.ping(instance)) { + mergedInstances.add(instance); + } + } + }); + return mergedInstances; + } + + private boolean inEmptyPulledInstancesProtectionSituation(List<MicroserviceInstance> pulledInstances) { + return pulledInstances.isEmpty() + && instances != null + && !instances.isEmpty() + && isEmptyInstanceProtectionEnabled(); + } + + @Override + public MicroserviceCacheKey getKey() { + return key; + } + + @Override + public List<MicroserviceInstance> getInstances() { + return instances; + } + + @Override + public String getRevisionId() { + return revisionId; + } + + @Override + public MicroserviceCacheStatus getStatus() { + return status; + } + + void setStatus(MicroserviceCacheStatus status) { + this.status = status; + } + + boolean isEmptyInstanceProtectionEnabled() { + return emptyInstanceProtectionEnabled; + } + + void setEmptyInstanceProtectionEnabled(boolean emptyInstanceProtectionEnabled) { + this.emptyInstanceProtectionEnabled = emptyInstanceProtectionEnabled; + } + + void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent event) { + if (!microserviceMatched(event)) { + return; + } + refresh(); + } + + void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) { + this.safeMode = modeChangeEvent.getCurrentMode(); + } + + private boolean microserviceMatched(MicroserviceInstanceChangedEvent event) { + return (key.getAppId().equals(event.getKey().getAppId())) // appId matched + && ( // microserviceName matched + key.getServiceName().equals(event.getKey().getServiceName()) + || key.getServiceName().equals( + event.getKey().getAppId() + Const.APP_SERVICE_SEPARATOR + event.getKey().getServiceName() + )); + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableServiceRegistryCache.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableServiceRegistryCache.java new file mode 100644 index 0000000..7bd1a1b --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableServiceRegistryCache.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx; +import org.apache.servicecomb.serviceregistry.api.registry.Microservice; +import org.apache.servicecomb.serviceregistry.api.response.MicroserviceInstanceChangedEvent; +import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus; +import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache the pulled microservice instances. + */ +public class RefreshableServiceRegistryCache implements ServiceRegistryCache { + private static final Logger LOGGER = LoggerFactory.getLogger(RefreshableServiceRegistryCache.class); + + Map<MicroserviceCacheKey, RefreshableMicroserviceCache> microserviceCache = new ConcurrentHashMapEx<>(); + + Microservice consumerService; + + ServiceRegistryClient srClient; + + boolean emptyInstanceProtectionEnabled = false; + + Consumer<List<MicroserviceCache>> cacheRefreshedWatcher; + + ReentrantLock refreshLock = new ReentrantLock(); + + public RefreshableServiceRegistryCache(Microservice consumerService, ServiceRegistryClient srClient) { + this.consumerService = consumerService; + this.srClient = srClient; + } + + public void refreshCache() { + if (!refreshLock.tryLock()) { + LOGGER.info("ignore concurrent refresh request"); + return; + } + + try { + List<MicroserviceCache> refreshedCaches = refreshInnerState(false); + notifyWatcher(refreshedCaches); + } catch (Exception e) { + LOGGER.error("failed to refresh caches", e); + } finally { + refreshLock.unlock(); + } + } + + public void forceRefreshCache() { + refreshLock.lock(); + try { + List<MicroserviceCache> refreshedCaches = refreshInnerState(true); + notifyWatcher(refreshedCaches); + } catch (Exception e) { + LOGGER.error("failed to refresh caches", e); + } finally { + refreshLock.unlock(); + } + } + + private List<MicroserviceCache> refreshInnerState(boolean isForced) { + return microserviceCache.values().stream() + .peek(cache -> { + if (isForced) { + cache.forceRefresh(); + } else { + cache.refresh(); + } + }) + .filter(this::isRefreshedMicroserviceCache) + .peek(this::removeCacheIfServiceNotFound) + .collect(Collectors.toList()); + } + + private boolean isRefreshedMicroserviceCache(MicroserviceCache microserviceCache) { + return MicroserviceCacheStatus.REFRESHED.equals(microserviceCache.getStatus()) + || MicroserviceCacheStatus.SERVICE_NOT_FOUND.equals(microserviceCache.getStatus()); + } + + private void notifyWatcher(List<MicroserviceCache> refreshedCaches) { + if (refreshedCaches.isEmpty() || null == cacheRefreshedWatcher) { + return; + } + cacheRefreshedWatcher.accept(refreshedCaches); + } + + @Override + public MicroserviceCache findServiceCache(MicroserviceCacheKey microserviceCacheKey) { + microserviceCacheKey.validate(); + RefreshableMicroserviceCache targetCache = microserviceCache + .computeIfAbsent(microserviceCacheKey, pk -> { + RefreshableMicroserviceCache microserviceCache = createMicroserviceCache(microserviceCacheKey); + microserviceCache.refresh(); + return microserviceCache; + }); + removeCacheIfServiceNotFound(targetCache); + return targetCache; + } + + private void removeCacheIfServiceNotFound(MicroserviceCache targetCache) { + if (MicroserviceCacheStatus.SERVICE_NOT_FOUND.equals(targetCache.getStatus())) { + microserviceCache.remove(targetCache.getKey()); + LOGGER.info("microserviceCache[{}] got removed", targetCache.getKey()); + } + } + + RefreshableMicroserviceCache createMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) { + return new RefreshableMicroserviceCache( + consumerService, + microserviceCacheKey, + srClient, + emptyInstanceProtectionEnabled); + } + + public RefreshableServiceRegistryCache setEmptyInstanceProtectionEnabled(boolean emptyInstanceProtectionEnabled) { + this.emptyInstanceProtectionEnabled = emptyInstanceProtectionEnabled; + return this; + } + + @Override + public ServiceRegistryCache setCacheRefreshedWatcher( + Consumer<List<MicroserviceCache>> cacheRefreshedWatcher) { + this.cacheRefreshedWatcher = cacheRefreshedWatcher; + return this; + } + + public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent event) { + List<MicroserviceCache> refreshedCaches = + microserviceCache.entrySet().stream() + .peek(cacheEntry -> cacheEntry.getValue().onMicroserviceInstanceChanged(event)) + .filter(cacheEntry -> isRefreshedMicroserviceCache(cacheEntry.getValue())) + .map(Entry::getValue) + .collect(Collectors.toList()); + + notifyWatcher(refreshedCaches); + } + + public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) { + for (Entry<MicroserviceCacheKey, RefreshableMicroserviceCache> cacheEntry : microserviceCache.entrySet()) { + cacheEntry.getValue().onSafeModeChanged(modeChangeEvent); + } + } + + @Override + public Map<MicroserviceCacheKey, MicroserviceCache> getMicroserviceCaches() { + return Collections.unmodifiableMap(microserviceCache); + } +} diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/ServiceRegistryCache.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/ServiceRegistryCache.java new file mode 100644 index 0000000..fdc27fe --- /dev/null +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/cache/ServiceRegistryCache.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +public interface ServiceRegistryCache { + MicroserviceCache findServiceCache(MicroserviceCacheKey microserviceCacheKey); + + ServiceRegistryCache setCacheRefreshedWatcher(Consumer<List<MicroserviceCache>> cacheRefreshedWatcher); + + Map<MicroserviceCacheKey, MicroserviceCache> getMicroserviceCaches(); +} diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/EmptyMockServiceRegistry.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/EmptyMockServiceRegistry.java new file mode 100644 index 0000000..d14532e --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/EmptyMockServiceRegistry.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.servicecomb.serviceregistry.Features; +import org.apache.servicecomb.serviceregistry.ServiceRegistry; +import org.apache.servicecomb.serviceregistry.api.registry.Microservice; +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; +import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; +import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey; + +import com.google.common.eventbus.EventBus; + +public class EmptyMockServiceRegistry implements ServiceRegistry { + @Override + public String getName() { + return null; + } + + @Override + public void init() { + + } + + @Override + public void run() { + + } + + @Override + public void destroy() { + + } + + @Override + public EventBus getEventBus() { + return null; + } + + @Override + public Set<String> getCombinedMicroserviceNames() { + return null; + } + + @Override + public String getAppId() { + return null; + } + + @Override + public Microservice getMicroservice() { + return null; + } + + @Override + public MicroserviceInstance getMicroserviceInstance() { + return null; + } + + @Override + public ServiceRegistryClient getServiceRegistryClient() { + return null; + } + + @Override + public List<MicroserviceInstance> findServiceInstance(String appId, String microserviceName, + String microserviceVersionRule) { + return null; + } + + @Override + public MicroserviceInstances findServiceInstances(String appId, String microserviceName, + String microserviceVersionRule, String revision) { + return null; + } + + @Override + public MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) { + return null; + } + + @Override + public boolean updateMicroserviceProperties(Map<String, String> properties) { + return false; + } + + @Override + public boolean updateInstanceProperties(Map<String, String> instanceProperties) { + return false; + } + + @Override + public Microservice getRemoteMicroservice(String microserviceId) { + return null; + } + + @Override + public Microservice getAggregatedRemoteMicroservice(String microserviceId) { + return null; + } + + @Override + public Features getFeatures() { + return null; + } + + @Override + public void registerMicroserviceMapping(String microserviceName, String version, List<MicroserviceInstance> instances, + Class<?> schemaIntfCls) { + + } + + @Override + public void registerMicroserviceMappingByEndpoints(String microserviceName, String version, List<String> endpoints, + Class<?> schemaIntfCls) { + + } +} diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateMicroserviceCacheTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateMicroserviceCacheTest.java new file mode 100644 index 0000000..9de57c8 --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateMicroserviceCacheTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.servicecomb.serviceregistry.ServiceRegistry; +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; +import org.apache.servicecomb.serviceregistry.registry.EmptyMockServiceRegistry; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus; +import org.junit.Assert; +import org.junit.Test; + +public class AggregateMicroserviceCacheTest { + + @Test + public void refresh() { + MicroserviceCacheKey microserviceCacheKey = + MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("production").build(); + + MockMicroserviceCache mockMicroserviceCache0 = new MockMicroserviceCache(microserviceCacheKey, + MicroserviceCacheStatus.NO_CHANGE); + MockMicroserviceCache mockMicroserviceCache2 = new MockMicroserviceCache(microserviceCacheKey, + MicroserviceCacheStatus.REFRESHED); + mockMicroserviceCache2.instances = Arrays.asList(new MicroserviceInstance(), new MicroserviceInstance()); + MockMicroserviceCache mockMicroserviceCache3 = new MockMicroserviceCache(microserviceCacheKey, + MicroserviceCacheStatus.SERVICE_NOT_FOUND); + + MockServiceRegistry mockServiceRegistry0 = new MockServiceRegistry().setName("s0") + .addCache(mockMicroserviceCache0) + .addCache(new MockMicroserviceCache( + MicroserviceCacheKey.builder().serviceName("svc2").appId("app").env("production").build(), + MicroserviceCacheStatus.REFRESHED)); + MockServiceRegistry mockServiceRegistry1 = new MockServiceRegistry().setName("s1"); + MockServiceRegistry mockServiceRegistry2 = new MockServiceRegistry().setName("s2") + .addCache(mockMicroserviceCache2); + MockServiceRegistry mockServiceRegistry3 = new MockServiceRegistry().setName("s3") + .addCache(mockMicroserviceCache3); + + List<ServiceRegistry> serviceRegistries = Arrays.asList( + mockServiceRegistry0, + mockServiceRegistry1, + mockServiceRegistry2, + mockServiceRegistry3 + ); + + AggregateMicroserviceCache compositeMicroserviceCache = new AggregateMicroserviceCache( + microserviceCacheKey, + serviceRegistries); + + // Test initialization + // key + Assert.assertSame(microserviceCacheKey, compositeMicroserviceCache.getKey()); + // status + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, compositeMicroserviceCache.getStatus()); + // revision + Assert.assertEquals("1", compositeMicroserviceCache.getRevisionId()); + Assert.assertEquals(1L, compositeMicroserviceCache.revisionCounter.get()); + // MicroserviceCache map + Assert.assertEquals(2, compositeMicroserviceCache.caches.size()); + Assert.assertSame(mockMicroserviceCache0, compositeMicroserviceCache.caches.get("s0")); + Assert.assertSame(mockMicroserviceCache2, compositeMicroserviceCache.caches.get("s2")); + // ServiceRegistry collection + Assert.assertEquals(serviceRegistries.size(), compositeMicroserviceCache.serviceRegistries.size()); + Iterator<ServiceRegistry> serviceRegistryIterator = compositeMicroserviceCache.serviceRegistries.iterator(); + Assert.assertSame(serviceRegistries.get(0), serviceRegistryIterator.next()); + Assert.assertSame(serviceRegistries.get(1), serviceRegistryIterator.next()); + Assert.assertSame(serviceRegistries.get(2), serviceRegistryIterator.next()); + Assert.assertSame(serviceRegistries.get(3), serviceRegistryIterator.next()); + // cached instances + Assert.assertEquals(2, compositeMicroserviceCache.getInstances().size()); + Assert.assertSame(mockMicroserviceCache2.instances.get(0), compositeMicroserviceCache.getInstances().get(0)); + Assert.assertSame(mockMicroserviceCache2.instances.get(1), compositeMicroserviceCache.getInstances().get(1)); + + // Test refresh() + mockMicroserviceCache0.instances = Collections.singletonList(new MicroserviceInstance()); + mockMicroserviceCache2.instances = Collections.singletonList(new MicroserviceInstance()); + compositeMicroserviceCache.refresh(); + // status + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, compositeMicroserviceCache.getStatus()); + // revision + Assert.assertEquals("2", compositeMicroserviceCache.getRevisionId()); + Assert.assertEquals(2L, compositeMicroserviceCache.revisionCounter.get()); + // cached instances + Assert.assertEquals(2, compositeMicroserviceCache.getInstances().size()); + Assert.assertSame(mockMicroserviceCache0.instances.get(0), compositeMicroserviceCache.getInstances().get(0)); + Assert.assertSame(mockMicroserviceCache2.instances.get(0), compositeMicroserviceCache.getInstances().get(1)); + + // Test refresh() + // microservice deleted and registered + mockMicroserviceCache0.status = MicroserviceCacheStatus.SERVICE_NOT_FOUND; + mockMicroserviceCache3.status = MicroserviceCacheStatus.REFRESHED; + compositeMicroserviceCache.refresh(); + // status + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, compositeMicroserviceCache.getStatus()); + // revision + Assert.assertEquals("3", compositeMicroserviceCache.getRevisionId()); + Assert.assertEquals(3L, compositeMicroserviceCache.revisionCounter.get()); + // ServiceRegistries + Assert.assertNotNull(compositeMicroserviceCache.caches.get("s2")); + Assert.assertNotNull(compositeMicroserviceCache.caches.get("s3")); + // cached instances + Assert.assertEquals(1, compositeMicroserviceCache.getInstances().size()); + Assert.assertSame(mockMicroserviceCache2.instances.get(0), compositeMicroserviceCache.getInstances().get(0)); + } + + public static class MockServiceRegistry extends EmptyMockServiceRegistry { + String name; + + Map<MicroserviceCacheKey, MicroserviceCache> cacheMap = new HashMap<>(); + + public MockServiceRegistry setName(String name) { + this.name = name; + return this; + } + + public MockServiceRegistry addCache(MicroserviceCache microserviceCache) { + cacheMap.put(microserviceCache.getKey(), microserviceCache); + return this; + } + + @Override + public String getName() { + return name; + } + + @Override + public MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) { + return cacheMap.get(microserviceCacheKey); + } + } + + public static class MockMicroserviceCache extends RefreshableMicroserviceCache { + public MockMicroserviceCache(MicroserviceCacheKey key, MicroserviceCacheStatus microserviceCacheStatus) { + super(null, key, null, false); + setStatus(microserviceCacheStatus); + } + } +} diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateServiceRegistryCacheTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateServiceRegistryCacheTest.java new file mode 100644 index 0000000..3ce08f5 --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/AggregateServiceRegistryCacheTest.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; + +import org.apache.servicecomb.serviceregistry.registry.cache.AggregateMicroserviceCacheTest.MockMicroserviceCache; +import org.apache.servicecomb.serviceregistry.registry.cache.AggregateMicroserviceCacheTest.MockServiceRegistry; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus; +import org.junit.Before; +import org.junit.Test; + +public class AggregateServiceRegistryCacheTest { + + private MicroserviceCacheKey microserviceCacheKey; + + private MockServiceRegistry mockServiceRegistry0; + + private MockServiceRegistry mockServiceRegistry1; + + private MockServiceRegistry mockServiceRegistry2; + + private AggregateServiceRegistryCache aggregateServiceRegistryCache; + + @Before + public void before() { + microserviceCacheKey = MicroserviceCacheKey.builder() + .serviceName("svc").appId("app").env("env").build(); + + mockServiceRegistry0 = new MockServiceRegistry() + .setName("s0") + .addCache(new MockMicroserviceCache( + microserviceCacheKey, + MicroserviceCacheStatus.NO_CHANGE)); + mockServiceRegistry1 = new MockServiceRegistry() + .setName("s1") + .addCache(new MockMicroserviceCache( + microserviceCacheKey, + MicroserviceCacheStatus.REFRESHED)) + .addCache(new MockMicroserviceCache( + MicroserviceCacheKey.builder().serviceName("svc2").appId("app").env("env").build(), + MicroserviceCacheStatus.NO_CHANGE)); + mockServiceRegistry2 = new MockServiceRegistry() + .setName("s2") + .addCache(new MockMicroserviceCache( + microserviceCacheKey, + MicroserviceCacheStatus.SERVICE_NOT_FOUND)); + + aggregateServiceRegistryCache = new AggregateServiceRegistryCache( + Arrays.asList(mockServiceRegistry0, mockServiceRegistry1, mockServiceRegistry2)); + } + + @Test + public void findServiceCache() { + MicroserviceCache serviceCache = aggregateServiceRegistryCache.findServiceCache( + MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build() + ); + + assertTrue(serviceCache instanceof AggregateMicroserviceCache); + AggregateMicroserviceCache aggregateMicroserviceCache = (AggregateMicroserviceCache) serviceCache; + assertEquals(2, aggregateMicroserviceCache.caches.size()); + assertSame(mockServiceRegistry0.findMicroserviceCache(microserviceCacheKey), + aggregateMicroserviceCache.caches.get(mockServiceRegistry0.getName())); + assertSame(mockServiceRegistry1.findMicroserviceCache(microserviceCacheKey), + aggregateMicroserviceCache.caches.get(mockServiceRegistry1.getName())); + // aggregateMicroserviceCache holds the cache of svc + assertEquals(1, aggregateServiceRegistryCache.microserviceCache.size()); + assertNotNull(aggregateServiceRegistryCache.microserviceCache.get(microserviceCacheKey)); + + MicroserviceCache serviceCache2 = aggregateServiceRegistryCache.findServiceCache( + MicroserviceCacheKey.builder().serviceName("svc2").appId("app").env("env").build() + ); + + assertTrue(serviceCache2 instanceof AggregateMicroserviceCache); + AggregateMicroserviceCache aggregateMicroserviceCache2 = (AggregateMicroserviceCache) serviceCache2; + assertEquals(1, aggregateMicroserviceCache2.caches.size()); + assertSame( + mockServiceRegistry1.findMicroserviceCache( + MicroserviceCacheKey.builder().serviceName("svc2").appId("app").env("env").build()), + aggregateMicroserviceCache2.caches.get(mockServiceRegistry1.getName())); + assertEquals(2, aggregateServiceRegistryCache.microserviceCache.size()); + assertNotNull(aggregateServiceRegistryCache.microserviceCache.get( + MicroserviceCacheKey.builder().serviceName("svc2").appId("app").env("env").build() + )); + } + + @Test + public void findServiceCache_not_found() { + MicroserviceCache serviceCache = aggregateServiceRegistryCache.findServiceCache( + MicroserviceCacheKey.builder().serviceName("svc-not-exist").appId("app").env("env").build() + ); + + assertTrue(serviceCache instanceof AggregateMicroserviceCache); + assertEquals(MicroserviceCacheStatus.SERVICE_NOT_FOUND, serviceCache.getStatus()); + AggregateMicroserviceCache aggregateMicroserviceCache = (AggregateMicroserviceCache) serviceCache; + assertEquals(0, aggregateMicroserviceCache.caches.size()); + assertEquals(3, aggregateMicroserviceCache.serviceRegistries.size()); + // should remove the cache of not existing microservice + assertEquals(0, aggregateServiceRegistryCache.microserviceCache.size()); + } + + @Test + public void onMicroserviceCacheRefreshed() { + MicroserviceCacheKey microserviceCacheKey = + MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build(); + MicroserviceCacheKey microserviceCacheKey2 = + MicroserviceCacheKey.builder().serviceName("svc2").appId("app").env("env").build(); + aggregateServiceRegistryCache.onMicroserviceCacheRefreshed(new MicroserviceCacheRefreshedEvent( + Collections.singletonList( + new MockMicroserviceCache( + microserviceCacheKey, + MicroserviceCacheStatus.REFRESHED + ) + ) + )); + + assertTrue(aggregateServiceRegistryCache.microserviceCache.isEmpty()); + + MicroserviceCache serviceCache = aggregateServiceRegistryCache.findServiceCache(microserviceCacheKey); + MicroserviceCache serviceCache2 = aggregateServiceRegistryCache.findServiceCache(microserviceCacheKey2); + + assertEquals("1", serviceCache.getRevisionId()); + assertEquals("1", serviceCache2.getRevisionId()); + + aggregateServiceRegistryCache.onMicroserviceCacheRefreshed(new MicroserviceCacheRefreshedEvent( + Collections.singletonList( + new MockMicroserviceCache( + microserviceCacheKey, + MicroserviceCacheStatus.REFRESHED + ) + ) + )); + + assertEquals("2", serviceCache.getRevisionId()); + assertEquals("1", serviceCache2.getRevisionId()); + + // test watcher + ArrayList<Object> refreshedCaches = new ArrayList<>(); + aggregateServiceRegistryCache.setCacheRefreshedWatcher(refreshedCaches::addAll); + + aggregateServiceRegistryCache.onMicroserviceCacheRefreshed(new MicroserviceCacheRefreshedEvent( + Arrays.asList( + new MockMicroserviceCache( + microserviceCacheKey, + MicroserviceCacheStatus.REFRESHED + ), + new MockMicroserviceCache( + microserviceCacheKey2, + MicroserviceCacheStatus.REFRESHED + ) + ) + )); + + assertEquals("3", serviceCache.getRevisionId()); + assertEquals("2", serviceCache2.getRevisionId()); + assertEquals(2, refreshedCaches.size()); + assertSame(serviceCache, refreshedCaches.get(0)); + assertSame(serviceCache2, refreshedCaches.get(1)); + + refreshedCaches.clear(); + + // test removing not existing service cache + ((MockMicroserviceCache) mockServiceRegistry0.findMicroserviceCache(microserviceCacheKey)) + .setStatus(MicroserviceCacheStatus.SERVICE_NOT_FOUND); + ((MockMicroserviceCache) mockServiceRegistry1.findMicroserviceCache(microserviceCacheKey)) + .setStatus(MicroserviceCacheStatus.SERVICE_NOT_FOUND); + aggregateServiceRegistryCache.onMicroserviceCacheRefreshed(new MicroserviceCacheRefreshedEvent( + Arrays.asList( + new MockMicroserviceCache( + microserviceCacheKey, + MicroserviceCacheStatus.REFRESHED + ), + new MockMicroserviceCache( + microserviceCacheKey2, + MicroserviceCacheStatus.REFRESHED + ) + ) + )); + + assertEquals("4", serviceCache.getRevisionId()); + assertEquals("3", serviceCache2.getRevisionId()); + assertEquals(2, refreshedCaches.size()); + assertSame(serviceCache, refreshedCaches.get(0)); + assertSame(serviceCache2, refreshedCaches.get(1)); + assertEquals(MicroserviceCacheStatus.SERVICE_NOT_FOUND, serviceCache.getStatus()); + // not existing service cache removed, only serviceCache2 is left + assertEquals(1, aggregateServiceRegistryCache.microserviceCache.size()); + assertSame(serviceCache2, aggregateServiceRegistryCache.microserviceCache.get(microserviceCacheKey2)); + } +} \ No newline at end of file diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheKeyTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheKeyTest.java new file mode 100644 index 0000000..b38f83e --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/MicroserviceCacheKeyTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import static org.junit.Assert.fail; + +import org.junit.Assert; +import org.junit.Test; + +public class MicroserviceCacheKeyTest { + + @Test + public void constructors() { + checkConstructorException(null, "appId", "svc", "microserviceCacheKey.env is null"); + checkConstructorException("env", null, "svc", "microserviceCacheKey.appId is null"); + checkConstructorException("env", "appId", null, "microserviceCacheKey.serviceName is null"); + + MicroserviceCacheKey microserviceCacheKey = + MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build(); + Assert.assertEquals("svc", microserviceCacheKey.getServiceName()); + Assert.assertEquals("app", microserviceCacheKey.getAppId()); + Assert.assertEquals("env", microserviceCacheKey.getEnv()); + Assert.assertEquals("svc@app@env", microserviceCacheKey.toString()); + + microserviceCacheKey = + MicroserviceCacheKey.builder().serviceName("app:svc").appId("app").env("env").build(); + Assert.assertEquals("svc", microserviceCacheKey.getServiceName()); + Assert.assertEquals("app", microserviceCacheKey.getAppId()); + Assert.assertEquals("env", microserviceCacheKey.getEnv()); + + microserviceCacheKey = + MicroserviceCacheKey.builder().serviceName("app2:svc").appId("app").env("env").build(); + Assert.assertEquals("svc", microserviceCacheKey.getServiceName()); + Assert.assertEquals("app2", microserviceCacheKey.getAppId()); + Assert.assertEquals("env", microserviceCacheKey.getEnv()); + } + + private void checkConstructorException(String env, String appId, String svc, String expectedMessage) { + try { + MicroserviceCacheKey.builder().env(env).appId(appId).serviceName(svc).build(); + fail("an Exception is expected!"); + } catch (Exception e) { + Assert.assertEquals(expectedMessage, e.getMessage()); + } + } + + @Test + public void equals_and_hashcode() { + MicroserviceCacheKey microserviceCacheKey = + MicroserviceCacheKey.builder().env("env").appId("app").serviceName("svc").build(); + MicroserviceCacheKey microserviceCacheKey2 = + MicroserviceCacheKey.builder().env("env").appId("app").serviceName("svc").build(); + Assert.assertEquals(microserviceCacheKey, microserviceCacheKey2); + Assert.assertEquals(microserviceCacheKey.hashCode(), microserviceCacheKey2.hashCode()); + + microserviceCacheKey2 = + MicroserviceCacheKey.builder().env("env1").appId("app").serviceName("svc").build(); + Assert.assertNotEquals(microserviceCacheKey, microserviceCacheKey2); + microserviceCacheKey2 = + MicroserviceCacheKey.builder().env("env").appId("app1").serviceName("svc").build(); + Assert.assertNotEquals(microserviceCacheKey, microserviceCacheKey2); + microserviceCacheKey2 = + MicroserviceCacheKey.builder().env("env").appId("app").serviceName("svc1").build(); + Assert.assertNotEquals(microserviceCacheKey, microserviceCacheKey2); + } + + @Test + public void plainKey() { + MicroserviceCacheKey microserviceCacheKey = + MicroserviceCacheKey.builder().env("env").appId("app").serviceName("svc").build(); + Assert.assertEquals("svc@app@env", microserviceCacheKey.plainKey()); + } +} \ No newline at end of file diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/MockedMicroserviceCache.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/MockedMicroserviceCache.java new file mode 100644 index 0000000..4d40731 --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/MockedMicroserviceCache.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.List; + +import org.apache.servicecomb.serviceregistry.api.registry.Microservice; +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; +import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; + +public class MockedMicroserviceCache extends RefreshableMicroserviceCache { + public MockedMicroserviceCache() { + super(null, null, null, false); + } + + public MockedMicroserviceCache(Microservice consumerService, MicroserviceCacheKey key, + ServiceRegistryClient srClient, + boolean emptyInstanceProtectionEnabled) { + super(consumerService, key, srClient, emptyInstanceProtectionEnabled); + } + + @Override + public void setStatus(MicroserviceCacheStatus status) { + super.setStatus(status); + } + + public void setInstances(List<MicroserviceInstance> instances) { + this.instances = instances; + } + + public void setRevisionId(String revisionId) { + this.revisionId = revisionId; + } +} diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableMicroserviceCacheTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableMicroserviceCacheTest.java new file mode 100644 index 0000000..0486e00 --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableMicroserviceCacheTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +import org.apache.servicecomb.foundation.common.Holder; +import org.apache.servicecomb.serviceregistry.api.registry.Microservice; +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; +import org.apache.servicecomb.serviceregistry.api.response.FindInstancesResponse; +import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; +import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances; +import org.apache.servicecomb.serviceregistry.consumer.MicroserviceInstancePing; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus; +import org.apache.servicecomb.serviceregistry.task.event.SafeModeChangeEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import mockit.Mock; +import mockit.MockUp; + +public class RefreshableMicroserviceCacheTest { + + private Holder<Function<Object[], MicroserviceInstances>> findServiceInstancesOprHolder = new Holder<>(); + + private ServiceRegistryClient srClient; + + private RefreshableMicroserviceCache microserviceCache; + + private List<MicroserviceInstance> pulledInstances = new ArrayList<>(); + + private Microservice consumerService; + + @Before + public void setUp() throws Exception { + srClient = new MockUp<ServiceRegistryClient>() { + @Mock + MicroserviceInstances findServiceInstances(String consumerId, String appId, String serviceName, + String versionRule, String revision) { + return findServiceInstancesOprHolder.value + .apply(new Object[] {consumerId, appId, serviceName, versionRule, revision}); + } + }.getMockInstance(); + consumerService = new Microservice(); + consumerService.setServiceId("consumerId"); + microserviceCache = new RefreshableMicroserviceCache( + consumerService, + MicroserviceCacheKey.builder().env("env").appId("app").serviceName("svc").build(), + srClient, + false); + + findServiceInstancesOprHolder.value = params -> { + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + microserviceInstances.setNeedRefresh(true); + microserviceInstances.setRevision("rev0"); + microserviceInstances.setMicroserviceNotExist(false); + + FindInstancesResponse instancesResponse = new FindInstancesResponse(); + instancesResponse.setInstances(pulledInstances); + microserviceInstances.setInstancesResponse(instancesResponse); + + return microserviceInstances; + }; + } + + @Test + public void forceRefresh() { + MicroserviceInstance microserviceInstance = new MicroserviceInstance(); + microserviceInstance.setInstanceId("instanceId00"); + ArrayList<MicroserviceInstance> instances = new ArrayList<>(); + instances.add(microserviceInstance); + findServiceInstancesOprHolder.value = params -> { + Assert.assertEquals("consumerId", params[0]); + Assert.assertEquals("app", params[1]); + Assert.assertEquals("svc", params[2]); + Assert.assertEquals("0.0.0.0+", params[3]); + Assert.assertNull(params[4]); + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + microserviceInstances.setNeedRefresh(true); + microserviceInstances.setRevision("rev2"); + microserviceInstances.setMicroserviceNotExist(false); + + FindInstancesResponse instancesResponse = new FindInstancesResponse(); + instancesResponse.setInstances(instances); + + microserviceInstances.setInstancesResponse(instancesResponse); + return microserviceInstances; + }; + + microserviceCache.revisionId = "rev"; + microserviceCache.forceRefresh(); + + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + List<MicroserviceInstance> cachedInstances = microserviceCache.getInstances(); + Assert.assertEquals(1, cachedInstances.size()); + MicroserviceInstance instance = cachedInstances.iterator().next(); + Assert.assertEquals("instanceId00", instance.getInstanceId()); + Assert.assertEquals("rev2", microserviceCache.getRevisionId()); + } + + @Test + public void refresh() { + ArrayList<MicroserviceInstance> instances = new ArrayList<>(); + findServiceInstancesOprHolder.value = params -> { + Assert.assertEquals("consumerId", params[0]); + Assert.assertEquals("app", params[1]); + Assert.assertEquals("svc", params[2]); + Assert.assertEquals("0.0.0.0+", params[3]); + Assert.assertNull(params[4]); + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + microserviceInstances.setNeedRefresh(true); + microserviceInstances.setRevision("rev0"); + microserviceInstances.setMicroserviceNotExist(false); + + FindInstancesResponse instancesResponse = new FindInstancesResponse(); + instancesResponse.setInstances(instances); + + microserviceInstances.setInstancesResponse(instancesResponse); + return microserviceInstances; + }; + + // at the beginning, no instances in cache + List<MicroserviceInstance> cachedInstances = microserviceCache.getInstances(); + Assert.assertEquals(0, cachedInstances.size()); + Assert.assertNull(microserviceCache.getRevisionId()); + + // find 1 instance from sc + MicroserviceInstance microserviceInstance = new MicroserviceInstance(); + instances.add(microserviceInstance); + microserviceInstance.setInstanceId("instanceId00"); + + microserviceCache.refresh(); + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + + cachedInstances = microserviceCache.getInstances(); + Assert.assertEquals(1, cachedInstances.size()); + MicroserviceInstance instance = cachedInstances.iterator().next(); + Assert.assertEquals("instanceId00", instance.getInstanceId()); + Assert.assertEquals("rev0", microserviceCache.getRevisionId()); + + // 2nd time, find 2 instances, one of them is the old instance + MicroserviceInstance microserviceInstance1 = new MicroserviceInstance(); + instances.add(microserviceInstance1); + microserviceInstance1.setInstanceId("instanceId01"); + + findServiceInstancesOprHolder.value = params -> { + Assert.assertEquals("consumerId", params[0]); + Assert.assertEquals("app", params[1]); + Assert.assertEquals("svc", params[2]); + Assert.assertEquals("0.0.0.0+", params[3]); + Assert.assertEquals("rev0", params[4]); + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + microserviceInstances.setNeedRefresh(true); + microserviceInstances.setRevision("rev1"); + microserviceInstances.setMicroserviceNotExist(false); + + FindInstancesResponse instancesResponse = new FindInstancesResponse(); + instancesResponse.setInstances(instances); + + microserviceInstances.setInstancesResponse(instancesResponse); + return microserviceInstances; + }; + + microserviceCache.refresh(); + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + cachedInstances = microserviceCache.getInstances(); + Assert.assertEquals(2, cachedInstances.size()); + Assert.assertEquals("instanceId00", cachedInstances.get(0).getInstanceId()); + Assert.assertEquals("instanceId01", cachedInstances.get(1).getInstanceId()); + } + + @Test + public void refresh_service_error() { + findServiceInstancesOprHolder.value = params -> null; + + List<MicroserviceInstance> oldInstanceList = microserviceCache.getInstances(); + + microserviceCache.refresh(); + Assert.assertEquals(MicroserviceCacheStatus.CLIENT_ERROR, microserviceCache.getStatus()); + Assert.assertSame(oldInstanceList, microserviceCache.getInstances()); + } + + @Test + public void refresh_service_not_exist() { + findServiceInstancesOprHolder.value = params -> { + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + microserviceInstances.setMicroserviceNotExist(true); + return microserviceInstances; + }; + + List<MicroserviceInstance> oldInstanceList = microserviceCache.getInstances(); + + microserviceCache.refresh(); + Assert.assertEquals(MicroserviceCacheStatus.SERVICE_NOT_FOUND, microserviceCache.getStatus()); + Assert.assertSame(oldInstanceList, microserviceCache.getInstances()); + } + + @Test + public void refresh_service_no_change() { + findServiceInstancesOprHolder.value = params -> { + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + microserviceInstances.setMicroserviceNotExist(false); + microserviceInstances.setNeedRefresh(false); + return microserviceInstances; + }; + + List<MicroserviceInstance> oldInstanceList = microserviceCache.getInstances(); + + microserviceCache.refresh(); + Assert.assertEquals(MicroserviceCacheStatus.NO_CHANGE, microserviceCache.getStatus()); + Assert.assertSame(oldInstanceList, microserviceCache.getInstances()); + } + + @Test + public void refresh_error_in_setInstances() { + microserviceCache = new RefreshableMicroserviceCache( + consumerService, + MicroserviceCacheKey.builder().env("env").appId("app").serviceName("svc").build(), + srClient, + false) { + @Override + protected Set<MicroserviceInstance> mergeInstances(List<MicroserviceInstance> pulledInstances) { + throw new IllegalStateException("a mock exception"); + } + }; + + List<MicroserviceInstance> oldInstanceList = microserviceCache.getInstances(); + Assert.assertEquals(MicroserviceCacheStatus.INIT, microserviceCache.getStatus()); + + microserviceCache.refresh(); + + Assert.assertEquals(MicroserviceCacheStatus.SETTING_CACHE_ERROR, microserviceCache.getStatus()); + List<MicroserviceInstance> newInstanceList = microserviceCache.getInstances(); + Assert.assertEquals(0, newInstanceList.size()); + Assert.assertSame(oldInstanceList, newInstanceList); + } + + @Test + public void refresh_safe_mode() { + microserviceCache.instances = new ArrayList<>(); + MicroserviceInstance instance0 = new MicroserviceInstance(); + instance0.setInstanceId("instanceId0"); + microserviceCache.instances.add(instance0); + + pulledInstances = new ArrayList<>(); + MicroserviceInstance instance1 = new MicroserviceInstance(); + instance1.setInstanceId("instanceId1"); + pulledInstances.add(instance1); + + microserviceCache.refresh(); + + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + Assert.assertEquals(1, microserviceCache.getInstances().size()); + Assert.assertEquals("instanceId1", microserviceCache.getInstances().get(0).getInstanceId()); + + // enter safe mode + microserviceCache.onSafeModeChanged(new SafeModeChangeEvent(true)); + + pulledInstances = new ArrayList<>(); + MicroserviceInstance instance2 = new MicroserviceInstance(); + instance2.setInstanceId("instanceId2"); + pulledInstances.add(instance2); + + microserviceCache.refresh(); + + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + Assert.assertEquals(2, microserviceCache.getInstances().size()); + Assert.assertEquals("instanceId2", microserviceCache.getInstances().get(0).getInstanceId()); + Assert.assertEquals("instanceId1", microserviceCache.getInstances().get(1).getInstanceId()); + + // exit safe mode + microserviceCache.onSafeModeChanged(new SafeModeChangeEvent(false)); + + pulledInstances = new ArrayList<>(); + MicroserviceInstance instance3 = new MicroserviceInstance(); + instance3.setInstanceId("instanceId3"); + pulledInstances.add(instance3); + + microserviceCache.refresh(); + + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + Assert.assertEquals(1, microserviceCache.getInstances().size()); + Assert.assertEquals("instanceId3", microserviceCache.getInstances().get(0).getInstanceId()); + } + + @Test + public void refresh_empty_instance_protection_disabled() { + microserviceCache.instances = new ArrayList<>(); + MicroserviceInstance instance0 = new MicroserviceInstance(); + instance0.setInstanceId("instanceId0"); + microserviceCache.instances.add(instance0); + + pulledInstances = new ArrayList<>(); + microserviceCache.refresh(); + + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + Assert.assertEquals(0, microserviceCache.getInstances().size()); + } + + @Test + public void refresh_empty_instance_protection_enabled() { + microserviceCache.setEmptyInstanceProtectionEnabled(true); + microserviceCache.instancePing = new MicroserviceInstancePing() { + @Override + public int getOrder() { + return 0; + } + + @Override + public boolean ping(MicroserviceInstance instance) { + return true; + } + }; + microserviceCache.instances = new ArrayList<>(); + MicroserviceInstance instance0 = new MicroserviceInstance(); + instance0.setInstanceId("instanceId0"); + microserviceCache.instances.add(instance0); + + pulledInstances = new ArrayList<>(); + microserviceCache.refresh(); + + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + Assert.assertEquals(1, microserviceCache.getInstances().size()); + Assert.assertEquals("instanceId0", microserviceCache.getInstances().get(0).getInstanceId()); + } + + @Test + public void set_consumer_service_id() { + Holder<Integer> assertCounter = new Holder<>(0); + Function<Object[], MicroserviceInstances> preservedLogic = findServiceInstancesOprHolder.value; + findServiceInstancesOprHolder.value = params -> { + Assert.assertEquals("consumerId", params[0]); + assertCounter.value++; + return preservedLogic.apply(params); + }; + microserviceCache.refresh(); + + consumerService.setServiceId("consumerId2"); + + findServiceInstancesOprHolder.value = params -> { + Assert.assertEquals("consumerId2", params[0]); + assertCounter.value++; + return preservedLogic.apply(params); + }; + microserviceCache.refresh(); + Assert.assertEquals(Integer.valueOf(2), assertCounter.value); + } +} \ No newline at end of file diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableServiceRegistryCacheTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableServiceRegistryCacheTest.java new file mode 100644 index 0000000..af6e9d3 --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/registry/cache/RefreshableServiceRegistryCacheTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry.registry.cache; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; +import java.util.function.Function; + +import org.apache.servicecomb.foundation.common.Holder; +import org.apache.servicecomb.serviceregistry.api.registry.Microservice; +import org.apache.servicecomb.serviceregistry.api.response.FindInstancesResponse; +import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class RefreshableServiceRegistryCacheTest { + + private Holder<Function<String, MicroserviceInstances>> pullInstanceFromServiceCenterLogic = new Holder<>( + rev -> { + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + microserviceInstances.setMicroserviceNotExist(false); + microserviceInstances.setNeedRefresh(true); + microserviceInstances.setRevision(rev); + FindInstancesResponse instancesResponse = new FindInstancesResponse(); + instancesResponse.setInstances(new ArrayList<>()); + microserviceInstances.setInstancesResponse(instancesResponse); + return microserviceInstances; + } + ); + + private RefreshableServiceRegistryCache serviceRegistryCache; + + private Microservice consumerService; + + @Before + public void setUp() throws Exception { + serviceRegistryCache = new RefreshableServiceRegistryCache(consumerService, null) { + @Override + RefreshableMicroserviceCache createMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) { + return new RefreshableMicroserviceCache(consumerService, microserviceCacheKey, null, false) { + @Override + MicroserviceInstances pullInstanceFromServiceCenter(String revisionId) { + return pullInstanceFromServiceCenterLogic.value.apply(revisionId); + } + }; + } + }; + consumerService = new Microservice(); + consumerService.setServiceId("testConsumer"); + } + + @Test + public void find_service_instances() { + MicroserviceCache microserviceCache = serviceRegistryCache + .findServiceCache(MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build()); + + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, microserviceCache.getStatus()); + Assert.assertEquals(0, microserviceCache.getInstances().size()); + Assert.assertEquals(1, serviceRegistryCache.microserviceCache.size()); + Entry<MicroserviceCacheKey, RefreshableMicroserviceCache> cacheEntry = + serviceRegistryCache.microserviceCache.entrySet().iterator().next(); + Assert.assertEquals(MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build(), + cacheEntry.getKey()); + } + + @Test + public void refreshCache() { + RefreshableMicroserviceCache microserviceCache = new RefreshableMicroserviceCache( + consumerService, + MicroserviceCacheKey.builder().serviceName("svc").appId("appId").env("env").build(), + null, false) { + @Override + public void refresh() { + this.status = MicroserviceCacheStatus.REFRESHED; + } + }; + RefreshableMicroserviceCache microserviceCache2 = new RefreshableMicroserviceCache( + consumerService, + MicroserviceCacheKey.builder().serviceName("svc2").appId("appId").env("env").build(), + null, false); + RefreshableMicroserviceCache microserviceCache3 = new RefreshableMicroserviceCache( + consumerService, + MicroserviceCacheKey.builder().serviceName("svc3").appId("appId").env("env").build(), + null, false) { + @Override + public void refresh() { + this.status = MicroserviceCacheStatus.SERVICE_NOT_FOUND; + } + }; + + serviceRegistryCache.microserviceCache.put(microserviceCache.getKey(), microserviceCache); + serviceRegistryCache.microserviceCache.put(microserviceCache2.getKey(), microserviceCache2); + serviceRegistryCache.microserviceCache.put(microserviceCache3.getKey(), microserviceCache3); + + List<MicroserviceCache> refreshedCaches = new ArrayList<>(); + serviceRegistryCache.setCacheRefreshedWatcher(refreshedCaches::addAll); + + serviceRegistryCache.refreshCache(); + + Assert.assertEquals(2, refreshedCaches.size()); + Assert.assertSame(microserviceCache.getKey(), refreshedCaches.get(0).getKey()); + Assert.assertSame(microserviceCache3.getKey(), refreshedCaches.get(1).getKey()); + Assert.assertEquals(2, serviceRegistryCache.microserviceCache.size()); + Assert.assertSame(microserviceCache, serviceRegistryCache.microserviceCache.get(microserviceCache.getKey())); + Assert.assertSame(microserviceCache2, serviceRegistryCache.microserviceCache.get(microserviceCache2.getKey())); + } + + @Test + public void forceRefreshCache() { + RefreshableMicroserviceCache microserviceCache = new RefreshableMicroserviceCache( + consumerService, + MicroserviceCacheKey.builder().serviceName("svc").appId("appId").env("env").build(), + null, false) { + @Override + public void forceRefresh() { + this.status = MicroserviceCacheStatus.REFRESHED; + } + }; + + serviceRegistryCache.microserviceCache.put(microserviceCache.getKey(), microserviceCache); + + List<MicroserviceCache> refreshedCaches = new ArrayList<>(); + serviceRegistryCache.setCacheRefreshedWatcher(refreshedCaches::addAll); + + serviceRegistryCache.forceRefreshCache(); + + Assert.assertEquals(1, refreshedCaches.size()); + Assert.assertSame(microserviceCache.getKey(), refreshedCaches.get(0).getKey()); + } + + @Test + public void findServiceCache_normal() { + mockServiceRegistryHolder().value = MicroserviceCacheStatus.REFRESHED; + + MicroserviceCacheKey cacheKey = MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build(); + MicroserviceCache serviceCache = serviceRegistryCache.findServiceCache(cacheKey); + + Assert.assertSame(cacheKey, serviceCache.getKey()); + Assert.assertEquals(MicroserviceCacheStatus.REFRESHED, serviceCache.getStatus()); + Assert.assertEquals(1, serviceRegistryCache.microserviceCache.size()); + Assert.assertSame(serviceCache, serviceRegistryCache.microserviceCache.get(cacheKey)); + } + + @Test + public void findServiceCache_client_error() { + mockServiceRegistryHolder().value = MicroserviceCacheStatus.CLIENT_ERROR; + + MicroserviceCacheKey cacheKey = MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build(); + MicroserviceCache serviceCache = serviceRegistryCache.findServiceCache(cacheKey); + + Assert.assertSame(cacheKey, serviceCache.getKey()); + Assert.assertEquals(MicroserviceCacheStatus.CLIENT_ERROR, serviceCache.getStatus()); + Assert.assertEquals(1, serviceRegistryCache.microserviceCache.size()); + Assert.assertSame(serviceCache, serviceRegistryCache.microserviceCache.get(cacheKey)); + } + + @Test + public void findServiceCache_service_not_found() { + mockServiceRegistryHolder().value = MicroserviceCacheStatus.SERVICE_NOT_FOUND; + + MicroserviceCacheKey cacheKey = MicroserviceCacheKey.builder().serviceName("svc").appId("app").env("env").build(); + MicroserviceCache serviceCache = serviceRegistryCache.findServiceCache(cacheKey); + + Assert.assertSame(cacheKey, serviceCache.getKey()); + Assert.assertEquals(MicroserviceCacheStatus.SERVICE_NOT_FOUND, serviceCache.getStatus()); + Assert.assertTrue(serviceRegistryCache.microserviceCache.isEmpty()); + } + + private Holder<MicroserviceCacheStatus> mockServiceRegistryHolder() { + Holder<MicroserviceCacheStatus> statusHolder = new Holder<>(); + serviceRegistryCache = new RefreshableServiceRegistryCache(consumerService, null) { + @Override + RefreshableMicroserviceCache createMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) { + return new RefreshableMicroserviceCache( + consumerService, + microserviceCacheKey, + null, false) { + @Override + public void refresh() { + this.status = statusHolder.value; + } + }; + } + }; + return statusHolder; + } +} \ No newline at end of file
