This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 0ac1567 Fix metadata equals and instance listener (#8275)
0ac1567 is described below
commit 0ac1567837d67db68b4580385881f48a17475d3b
Author: ken.lj <[email protected]>
AuthorDate: Fri Jul 16 19:40:15 2021 +0800
Fix metadata equals and instance listener (#8275)
---
.../context/DubboBootstrapApplicationListener.java | 1 +
.../org/apache/dubbo/metadata/MetadataInfo.java | 11 +-
.../metadata/DefaultMetadataParamsFilter.java | 21 +-
.../apache/dubbo/metadata/MetadataInfoTest.java | 38 +-
.../org.apache.dubbo.metadata.MetadataParamsFilter | 1 +
.../event/RetryServiceInstancesChangedEvent.java | 6 +-
.../listener/ServiceInstancesChangedListener.java | 82 ++--
.../registry/client/metadata/MetadataUtils.java | 2 +-
.../ServiceInstancesChangedListenerTest.java | 458 ++++++++++++++++++++-
.../migration/MigrationRuleListenerTest.java | 5 +-
.../registry/dns/DNSServiceDiscoveryTest.java | 1 +
pom.xml | 6 +
12 files changed, 576 insertions(+), 56 deletions(-)
diff --git
a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/context/DubboBootstrapApplicationListener.java
b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/context/DubboBootstrapApplicationListener.java
index 6be80b3..beae589 100644
---
a/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/context/DubboBootstrapApplicationListener.java
+++
b/dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/context/DubboBootstrapApplicationListener.java
@@ -119,6 +119,7 @@ public class DubboBootstrapApplicationListener implements
ApplicationListener, A
private void onContextClosedEvent(ContextClosedEvent event) {
if (dubboBootstrap.getTakeoverMode() == BootstrapTakeoverMode.SPRING) {
+ // will call dubboBootstrap.stop() through shutdown callback.
DubboShutdownHook.getDubboShutdownHook().run();
}
}
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index a3b34c2..d5b7c35 100644
---
a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -467,14 +467,19 @@ public class MetadataInfo implements Serializable {
ServiceInfo serviceInfo = (ServiceInfo) obj;
// return this.getMatchKey().equals(serviceInfo.getMatchKey()) &&
this.getParams().equals(serviceInfo.getParams());
- // Please check
ServiceInstancesChangedListener.localServiceToRevisions before change this
behaviour.
- return this.getMatchKey().equals(serviceInfo.getMatchKey());
+ // Please check
ServiceInstancesChangedListener.localServiceToRevisions before changing this
behaviour.
+ // equals to Objects.equals(this.getMatchKey(),
serviceInfo.getMatchKey()), but match key will not get initialized
+ // on json deserialization.
+ return Objects.equals(this.getVersion(), serviceInfo.getVersion())
+ && Objects.equals(this.getGroup(), serviceInfo.getGroup())
+ && Objects.equals(this.getName(), serviceInfo.getName())
+ && Objects.equals(this.getProtocol(),
serviceInfo.getProtocol());
}
@Override
public int hashCode() {
// return Objects.hash(getMatchKey(), getParams());
- return Objects.hash(getMatchKey());
+ return Objects.hash(getVersion(), getGroup(), getName(),
getProtocol());
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
b/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/DefaultMetadataParamsFilter.java
similarity index 73%
copy from
dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
copy to
dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/DefaultMetadataParamsFilter.java
index 5095f11..1b14b5c 100644
---
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/DefaultMetadataParamsFilter.java
@@ -14,19 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.registry.client.event.listener;
+package org.apache.dubbo.metadata;
-import org.junit.jupiter.api.Test;
-
-/**
- * {@link ServiceInstancesChangedListener} Test
- *
- * @since 2.7.5
- */
-public class ServiceInstancesChangedListenerTest {
-
- @Test
- public void testOnEvent() {
+public class DefaultMetadataParamsFilter implements MetadataParamsFilter {
+ @Override
+ public String[] serviceParamsIncluded() {
+ return new String[0];
+ }
+ @Override
+ public String[] instanceParamsIncluded() {
+ return new String[0];
}
}
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
b/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
index fdb7021..f9843b3 100644
---
a/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
+++
b/dubbo-metadata/dubbo-metadata-api/src/test/java/org/apache/dubbo/metadata/MetadataInfoTest.java
@@ -18,11 +18,23 @@ package org.apache.dubbo.metadata;
import org.apache.dubbo.common.URL;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
+
public class MetadataInfoTest {
+
+ @Test
+ public void testEmptyRevision() {
+ MetadataInfo metadataInfo = new MetadataInfo("demo");
+ metadataInfo.setApp("demo");
+
+ Assertions.assertEquals(EMPTY_REVISION,
metadataInfo.calAndGetRevision());
+ }
+
@Test
- public void revisionTest() {
+ public void testRevisionNotChange() {
MetadataInfo metadataInfo = new MetadataInfo("demo");
metadataInfo.setApp("demo");
@@ -33,4 +45,28 @@ public class MetadataInfoTest {
System.out.println(serviceInfo.toDescString());
System.out.println(metadataInfo.calAndGetRevision());
}
+
+ @Test
+ public void testParamsFiltered() {
+ MetadataInfo metadataInfo = new MetadataInfo("demo");
+ metadataInfo.setApp("demo");
+
+
+ }
+
+ @Test
+ public void testParamsStatusChangeAsExpected() {
+ MetadataInfo metadataInfo = new MetadataInfo("demo");
+ metadataInfo.setApp("demo");
+
+
+ }
+
+ @Test
+ public void testEquals() {
+ MetadataInfo metadataInfo = new MetadataInfo("demo");
+ metadataInfo.setApp("demo");
+
+
+ }
}
diff --git
a/dubbo-metadata/dubbo-metadata-api/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataParamsFilter
b/dubbo-metadata/dubbo-metadata-api/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataParamsFilter
new file mode 100644
index 0000000..eb21165
--- /dev/null
+++
b/dubbo-metadata/dubbo-metadata-api/src/test/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.MetadataParamsFilter
@@ -0,0 +1 @@
+test-default=org.apache.dubbo.metadata.DefaultMetadataParamsFilter
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/RetryServiceInstancesChangedEvent.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/RetryServiceInstancesChangedEvent.java
index f796071..9172faf 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/RetryServiceInstancesChangedEvent.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/RetryServiceInstancesChangedEvent.java
@@ -16,6 +16,8 @@
*/
package org.apache.dubbo.registry.client.event;
+import java.util.Collections;
+
/**
* A retry task when is failed.
*/
@@ -23,8 +25,8 @@ public class RetryServiceInstancesChangedEvent extends
ServiceInstancesChangedEv
private volatile long failureRecordTime;
- public RetryServiceInstancesChangedEvent() {
- super();
+ public RetryServiceInstancesChangedEvent(String serviceName) {
+ super(serviceName, Collections.emptyList());// instance list has been
stored by ServiceInstancesChangedListener
this.failureRecordTime = System.currentTimeMillis();
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 6eedc18..890e046 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -49,9 +49,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
@@ -78,8 +78,6 @@ public class ServiceInstancesChangedListener {
protected Map<String, MetadataInfo> revisionToMetadata;
private volatile long lastRefreshTime;
- private volatile long lastFailureTime;
- private volatile AtomicInteger failureCounter = new AtomicInteger(0);
private Semaphore retryPermission;
private volatile ScheduledFuture<?> retryFuture;
private static ScheduledExecutorService scheduler =
ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getMetadataRetryExecutor();
@@ -114,6 +112,7 @@ public class ServiceInstancesChangedListener {
Map<String, Object> newServiceUrls = new HashMap<>();//TODO
Map<String, MetadataInfo> newRevisionToMetadata = new HashMap<>();
+ // grouping all instances of this app(service name) by revision
for (Map.Entry<String, List<ServiceInstance>> entry :
allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
@@ -126,17 +125,21 @@ public class ServiceInstancesChangedListener {
}
List<ServiceInstance> subInstances =
revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
subInstances.add(instance);
+ }
+ }
- MetadataInfo metadata = getRemoteMetadata(instance, revision,
localServiceToRevisions, subInstances);
-
- if (metadata==null || metadata == MetadataInfo.EMPTY) {
- // it means fetching Meta Server failed if metadata is
null, ignore this instance
- subInstances.remove(instance);
- continue;
- }
- ((DefaultServiceInstance)
instance).setServiceMetadata(metadata);
- newRevisionToMetadata.putIfAbsent(revision, metadata);
+ // get MetadataInfo with revision
+ for (Map.Entry<String, List<ServiceInstance>> entry :
revisionToInstances.entrySet()) {
+ String revision = entry.getKey();
+ List<ServiceInstance> subInstances = entry.getValue();
+ ServiceInstance instance = selectInstance(subInstances);
+ MetadataInfo metadata = getRemoteMetadata(revision,
localServiceToRevisions, instance);
+ // update metadata into each instance, in case new instance
created.
+ for (ServiceInstance tmpInstance : subInstances) {
+
((DefaultServiceInstance)tmpInstance).setServiceMetadata(metadata);
}
+// ((DefaultServiceInstance) instance).setServiceMetadata(metadata);
+ newRevisionToMetadata.putIfAbsent(revision, metadata);
}
if(logger.isDebugEnabled()) {
@@ -145,12 +148,11 @@ public class ServiceInstancesChangedListener {
if (hasEmptyMetadata(newRevisionToMetadata)) {// retry every 10 seconds
if (retryPermission.tryAcquire()) {
- retryFuture = scheduler.schedule(new
AddressRefreshRetryTask(retryPermission), 10000, TimeUnit.MILLISECONDS);
+ retryFuture = scheduler.schedule(new
AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10000,
TimeUnit.MILLISECONDS);
logger.warn("Address refresh try task submitted.");
}
-// logger.warn("Address refresh failed because of Metadata Server
failure, wait for retry or new address refresh event.");
-// this.revisionToMetadata = newRevisionToMetadata;
-// return;
+ logger.error("Address refresh failed because of Metadata Server
failure, wait for retry or new address refresh event.");
+ return;
}
this.revisionToMetadata = newRevisionToMetadata;
@@ -265,33 +267,37 @@ public class ServiceInstancesChangedListener {
return false;
}
- protected MetadataInfo getRemoteMetadata(ServiceInstance instance, String
revision, Map<ServiceInfo, Set<String>> localServiceToRevisions,
List<ServiceInstance> subInstances) {
+ protected MetadataInfo getRemoteMetadata(String revision, Map<ServiceInfo,
Set<String>> localServiceToRevisions, ServiceInstance instance) {
MetadataInfo metadata = revisionToMetadata.get(revision);
if (metadata != null && metadata != MetadataInfo.EMPTY) {
+ // metadata loaded from cache
if (logger.isDebugEnabled()) {
logger.debug("MetadataInfo for instance " +
instance.getAddress() + "?revision=" + revision + "&cluster=" +
instance.getRegistryCluster() + ", " + metadata);
}
+ parseMetadata(revision, metadata, localServiceToRevisions);
+ return metadata;
}
- if (metadata == null
- || (metadata == MetadataInfo.EMPTY && (failureCounter.get() <
3 || (System.currentTimeMillis() - lastFailureTime > 10000)))) {
- metadata = getMetadataInfo(instance);
+ // try to load metadata from remote.
+ int triedTimes = 0;
+ while (triedTimes < 3) {
+ metadata = doGetMetadataInfo(instance);
- if (metadata != MetadataInfo.EMPTY) {
- failureCounter.set(0);
- revisionToMetadata.putIfAbsent(revision, metadata);
+ if (metadata != MetadataInfo.EMPTY) {// succeeded
parseMetadata(revision, metadata, localServiceToRevisions);
- } else {
+ break;
+ } else {// failed
logger.error("Failed to get MetadataInfo for instance " +
instance.getAddress() + "?revision=" + revision
- + "&cluster=" + instance.getRegistryCluster() + ",
wait for retry.");
- lastFailureTime = System.currentTimeMillis();
- failureCounter.incrementAndGet();
+ + "&cluster=" + instance.getRegistryCluster() + ", wait
for retry.");
+ triedTimes++;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {}
}
- } else if (metadata != MetadataInfo.EMPTY && subInstances.size() == 1)
{
- // "subInstances.size() >= 2" means metadata of this revision has
been parsed, ignore
- parseMetadata(revision, metadata, localServiceToRevisions);
}
+
+ revisionToMetadata.putIfAbsent(revision, metadata);
return metadata;
}
@@ -305,7 +311,7 @@ public class ServiceInstancesChangedListener {
return localServiceToRevisions;
}
- protected MetadataInfo getMetadataInfo(ServiceInstance instance) {
+ protected MetadataInfo doGetMetadataInfo(ServiceInstance instance) {
String metadataType =
ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
// FIXME, check "REGISTRY_CLUSTER_KEY" must be set by every registry
implementation.
if (instance.getRegistryCluster() == null) {
@@ -320,6 +326,7 @@ public class ServiceInstancesChangedListener {
RemoteMetadataServiceImpl remoteMetadataService =
MetadataUtils.getRemoteMetadataService();
metadataInfo = remoteMetadataService.getMetadata(instance);
} else {
+ // change the instance used to communicate to avoid all
requests route to the same instance
MetadataService metadataServiceProxy =
MetadataUtils.getMetadataServiceProxy(instance, serviceDiscovery);
metadataInfo =
metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
}
@@ -334,6 +341,13 @@ public class ServiceInstancesChangedListener {
return metadataInfo;
}
+ private ServiceInstance selectInstance(List<ServiceInstance> instances) {
+ if (instances.size() == 1) {
+ return instances.get(0);
+ }
+ return instances.get(ThreadLocalRandom.current().nextInt(0,
instances.size()));
+ }
+
protected Object getServiceUrlsCache(Map<String, List<ServiceInstance>>
revisionToInstances, Set<String> revisions, String protocol) {
List<URL> urls;
urls = new ArrayList<>();
@@ -412,12 +426,12 @@ public class ServiceInstancesChangedListener {
return Objects.hash(getClass(), getServiceNames());
}
- private class AddressRefreshRetryTask implements Runnable {
+ protected class AddressRefreshRetryTask implements Runnable {
private final RetryServiceInstancesChangedEvent retryEvent;
private final Semaphore retryPermission;
- public AddressRefreshRetryTask(Semaphore semaphore) {
- this.retryEvent = new RetryServiceInstancesChangedEvent();
+ public AddressRefreshRetryTask(Semaphore semaphore, String
serviceName) {
+ this.retryEvent = new
RetryServiceInstancesChangedEvent(serviceName);
this.retryPermission = semaphore;
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
index 42382f8..fd0c855 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
@@ -134,7 +134,7 @@ public class MetadataUtils {
List<URL> urls = builder.build(instance);
if (CollectionUtils.isEmpty(urls)) {
- throw new IllegalStateException("You have enabled introspection
service discovery mode for instance "
+ throw new IllegalStateException("Introspection service discovery
mode is enabled "
+ instance + ", but no metadata service can build from
it.");
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
index 5095f11..1303a09 100644
---
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
@@ -16,17 +16,473 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
+import org.apache.dubbo.metadata.MetadataService;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.InstanceAddressURL;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.metadata.MetadataUtils;
+
+import com.google.gson.Gson;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.mockito.ArgumentCaptor;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
+import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
+import static
org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.eq;
/**
* {@link ServiceInstancesChangedListener} Test
*
* @since 2.7.5
*/
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class ServiceInstancesChangedListenerTest {
+ private static Gson gson = new Gson();
+
+ static List<ServiceInstance> app1Instances;
+ static List<ServiceInstance> app2Instances;
+ static List<ServiceInstance> app1FailedInstances;
+ static List<ServiceInstance> app1FailedInstances2;
+
+ static String metadata_111 =
"{\"app\":\"app1\",\"revision\":\"111\",\"services\":{"
+ +
"\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\
[...]
+ + "}}";
+ static String metadata_222 =
"{\"app\":\"app2\",\"revision\":\"333\",\"services\":{"
+ +
"\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\
[...]
+ +
"\"org.apache.dubbo.demo.DemoService2:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService2\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService2\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService2\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"de
[...]
+ + "}}";
+ static String metadata_333 =
"{\"app\":\"app2\",\"revision\":\"333\",\"services\":{"
+ +
"\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\
[...]
+ +
"\"org.apache.dubbo.demo.DemoService2:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService2\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService2\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService2\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"de
[...]
+ +
"\"org.apache.dubbo.demo.DemoService3:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService3\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService3\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService3\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"de
[...]
+ + "}}";
+ // failed
+ static String metadata_444 =
"{\"app\":\"app1\",\"revision\":\"444\",\"services\":{"
+ +
"\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\
[...]
+ + "}}";
+
+ static String service1 = "org.apache.dubbo.demo.DemoService";
+ static String service2 = "org.apache.dubbo.demo.DemoService2";
+ static String service3 = "org.apache.dubbo.demo.DemoService3";
+
+ static URL consumerURL =
URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService?registry_cluster=default");
+
+ static MetadataInfo metadataInfo_111;
+ static MetadataInfo metadataInfo_222;
+ static MetadataInfo metadataInfo_333;
+ static MetadataInfo metadataInfo_444;
+
+ static MetadataService metadataService;
+
+ static ServiceDiscovery serviceDiscovery;
+
+ @BeforeAll
+ public static void setUp() {
+ List<Object> urlsSameRevision = new ArrayList<>();
+ urlsSameRevision.add("127.0.0.1:20880?revision=111");
+ urlsSameRevision.add("127.0.0.2:20880?revision=111");
+ urlsSameRevision.add("127.0.0.3:20880?revision=111");
+
+ List<Object> urlsDifferentRevision = new ArrayList<>();
+ urlsDifferentRevision.add("30.10.0.1:20880?revision=222");
+ urlsDifferentRevision.add("30.10.0.2:20880?revision=222");
+ urlsDifferentRevision.add("30.10.0.3:20880?revision=333");
+ urlsDifferentRevision.add("30.10.0.4:20880?revision=333");
+
+ List<Object> urlsFailedRevision = new ArrayList<>();
+ urlsFailedRevision.add("30.10.0.5:20880?revision=222");
+ urlsFailedRevision.add("30.10.0.6:20880?revision=222");
+ urlsFailedRevision.add("30.10.0.7:20880?revision=444");// revision
will fail
+ urlsFailedRevision.add("30.10.0.8:20880?revision=444");// revision
will fail
+
+ List<Object> urlsFailedRevision2 = new ArrayList<>();
+ urlsFailedRevision2.add("30.10.0.1:20880?revision=222");
+ urlsFailedRevision2.add("30.10.0.2:20880?revision=222");
+
+ app1Instances = buildInstances(urlsSameRevision);
+ app2Instances = buildInstances(urlsDifferentRevision);
+ app1FailedInstances = buildInstances(urlsFailedRevision);
+ app1FailedInstances2 = buildInstances(urlsFailedRevision2);
+
+ metadataInfo_111 = gson.fromJson(metadata_111, MetadataInfo.class);
+ metadataInfo_222 = gson.fromJson(metadata_222, MetadataInfo.class);
+ metadataInfo_333 = gson.fromJson(metadata_333, MetadataInfo.class);
+ metadataInfo_444 = gson.fromJson(metadata_444, MetadataInfo.class);
+
+ metadataService = Mockito.mock(MetadataService.class);
+
Mockito.doReturn(metadataInfo_111).when(metadataService).getMetadataInfo("111");
+
Mockito.doReturn(metadataInfo_222).when(metadataService).getMetadataInfo("222");
+
Mockito.doReturn(metadataInfo_333).when(metadataService).getMetadataInfo("333");
+
Mockito.doThrow(IllegalStateException.class).when(metadataService).getMetadataInfo("444");
+
+ serviceDiscovery = Mockito.mock(ServiceDiscovery.class);
+ }
+
+ // 正常场景。单应用app1 通知地址基本流程,只做instance-metadata关联,没有metadata内容的解析
+ @Test
+ @Order(1)
+ public void testInstanceNotification() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ ServiceDiscovery serviceDiscovery =
Mockito.mock(ServiceDiscovery.class);
+ ServiceInstancesChangedListener spyListener = Mockito.spy(new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery));
+
Mockito.doReturn(metadataInfo_111).when(spyListener).getRemoteMetadata(eq("111"),
Mockito.anyMap(), Mockito.any());
+ ServiceInstancesChangedEvent event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ spyListener.onEvent(event);
+
+ Map<String, List<ServiceInstance>> allInstances =
spyListener.getAllInstances();
+ Assertions.assertEquals(1, allInstances.size());
+ Assertions.assertEquals(3, allInstances.get("app1").size());
+
+ Map<String, MetadataInfo> revisionToMetadata =
spyListener.getRevisionToMetadata();
+ Assertions.assertEquals(1, revisionToMetadata.size());
+ Assertions.assertEquals(metadataInfo_111,
revisionToMetadata.get("111"));
+
+// // test app2 notification
+//
Mockito.doReturn(metadataInfo_222).when(spyListener).getRemoteMetadata(eq("222"),
Mockito.anyMap(), Mockito.anyList());
+//
Mockito.doReturn(metadataInfo_333).when(spyListener).getRemoteMetadata(eq("333"),
Mockito.anyMap(), Mockito.anyList());
+//
+// ServiceInstancesChangedEvent event_app2 = new
ServiceInstancesChangedEvent("app2", app2Instances);
+// spyListener.onEvent(event_app2);
+
+ }
+
+ // 正常场景。单应用app1,进一步检查 metadata service 是否正确映射
+ @Test
+ @Order(2)
+ public void testInstanceNotificationAndMetadataParse() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ ServiceInstancesChangedListener listener = new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+
+ try (MockedStatic<MetadataUtils> mockedMetadataUtils =
Mockito.mockStatic(MetadataUtils.class)) {
+ mockedMetadataUtils.when(() ->
MetadataUtils.getMetadataServiceProxy(Mockito.any(),
Mockito.any())).thenReturn(metadataService);
+ // notify instance change
+ ServiceInstancesChangedEvent event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(event);
+
+ Map<String, List<ServiceInstance>> allInstances =
listener.getAllInstances();
+ Assertions.assertEquals(1, allInstances.size());
+ Assertions.assertEquals(3, allInstances.get("app1").size());
+
+ Map<String, MetadataInfo> revisionToMetadata =
listener.getRevisionToMetadata();
+ Assertions.assertEquals(1, revisionToMetadata.size());
+ Assertions.assertEquals(metadataInfo_111,
revisionToMetadata.get("111"));
+
+ List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ Assertions.assertEquals(3, serviceUrls.size());
+ Assertions.assertTrue(serviceUrls.get(0) instanceof
InstanceAddressURL);
+
+ assertThat(serviceUrls,
Matchers.hasItem(Matchers.hasProperty("instance", Matchers.notNullValue())));
+ assertThat(serviceUrls,
Matchers.hasItem(Matchers.hasProperty("metadataInfo",
Matchers.notNullValue())));
+
+ }
+ }
+
+ // 正常场景。多应用,app1 app2 分别通知地址
+ @Test
+ @Order(3)
+ public void testMultipleAppNotification() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ serviceNames.add("app2");
+ ServiceInstancesChangedListener listener = new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+
+ try (MockedStatic<MetadataUtils> mockedMetadataUtils =
Mockito.mockStatic(MetadataUtils.class)) {
+ mockedMetadataUtils.when(() ->
MetadataUtils.getMetadataServiceProxy(Mockito.any(),
Mockito.any())).thenReturn(metadataService);
+ // notify app1 instance change
+ ServiceInstancesChangedEvent app1_event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(app1_event);
+
+ // notify app2 instance change
+ ServiceInstancesChangedEvent app2_event = new
ServiceInstancesChangedEvent("app2", app2Instances);
+ listener.onEvent(app2_event);
+
+ // check
+ Map<String, List<ServiceInstance>> allInstances =
listener.getAllInstances();
+ Assertions.assertEquals(2, allInstances.size());
+ Assertions.assertEquals(3, allInstances.get("app1").size());
+ Assertions.assertEquals(4, allInstances.get("app2").size());
+
+ Map<String, MetadataInfo> revisionToMetadata =
listener.getRevisionToMetadata();
+ Assertions.assertEquals(3, revisionToMetadata.size());
+ Assertions.assertEquals(metadataInfo_111,
revisionToMetadata.get("111"));
+ Assertions.assertEquals(metadataInfo_222,
revisionToMetadata.get("222"));
+ Assertions.assertEquals(metadataInfo_333,
revisionToMetadata.get("333"));
+
+ List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ Assertions.assertEquals(7, serviceUrls.size());
+ List<URL> serviceUrls2 = listener.getAddresses(service2 +
":dubbo", consumerURL);
+ Assertions.assertEquals(4, serviceUrls2.size());
+
Assertions.assertTrue(serviceUrls2.get(0).getIp().contains("30.10."));
+ List<URL> serviceUrls3 = listener.getAddresses(service3 +
":dubbo", consumerURL);
+ Assertions.assertEquals(2, serviceUrls3.size());
+
Assertions.assertTrue(serviceUrls3.get(0).getIp().contains("30.10."));
+ }
+ }
+ // 正常场景。多应用,app1 app2,空地址通知(边界条件)能否解析出正确的空地址列表
@Test
- public void testOnEvent() {
+ @Order(4)
+ public void testMultipleAppEmptyNotification() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ serviceNames.add("app2");
+ ServiceInstancesChangedListener listener = new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+
+ try (MockedStatic<MetadataUtils> mockedMetadataUtils =
Mockito.mockStatic(MetadataUtils.class)) {
+ mockedMetadataUtils.when(() ->
MetadataUtils.getMetadataServiceProxy(Mockito.any(),
Mockito.any())).thenReturn(metadataService);
+ // notify app1 instance change
+ ServiceInstancesChangedEvent app1_event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(app1_event);
+
+ // notify app2 instance change
+ ServiceInstancesChangedEvent app2_event = new
ServiceInstancesChangedEvent("app2", app2Instances);
+ listener.onEvent(app2_event);
+
+ // empty notification
+ ServiceInstancesChangedEvent app1_event_again = new
ServiceInstancesChangedEvent("app1", Collections.EMPTY_LIST);
+ listener.onEvent(app1_event_again);
+
+ // check app1 cleared
+ Map<String, List<ServiceInstance>> allInstances =
listener.getAllInstances();
+ Assertions.assertEquals(2, allInstances.size());
+ Assertions.assertEquals(0, allInstances.get("app1").size());
+ Assertions.assertEquals(4, allInstances.get("app2").size());
+
+ Map<String, MetadataInfo> revisionToMetadata =
listener.getRevisionToMetadata();
+ Assertions.assertEquals(2, revisionToMetadata.size());
+ Assertions.assertNull(revisionToMetadata.get("111"));
+ Assertions.assertEquals(metadataInfo_222,
revisionToMetadata.get("222"));
+ Assertions.assertEquals(metadataInfo_333,
revisionToMetadata.get("333"));
+
+ List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ Assertions.assertEquals(4, serviceUrls.size());
+
Assertions.assertTrue(serviceUrls.get(0).getIp().contains("30.10."));
+ List<URL> serviceUrls2 = listener.getAddresses(service2 +
":dubbo", consumerURL);
+ Assertions.assertEquals(4, serviceUrls2.size());
+
Assertions.assertTrue(serviceUrls2.get(0).getIp().contains("30.10."));
+ List<URL> serviceUrls3 = listener.getAddresses(service3 +
":dubbo", consumerURL);
+ Assertions.assertEquals(2, serviceUrls3.size());
+
Assertions.assertTrue(serviceUrls3.get(0).getIp().contains("30.10."));
+
+ // app2 empty notification
+ ServiceInstancesChangedEvent app2_event_again = new
ServiceInstancesChangedEvent("app2", Collections.EMPTY_LIST);
+ listener.onEvent(app2_event_again);
+
+ // check app2 cleared
+ Map<String, List<ServiceInstance>> allInstances_app2 =
listener.getAllInstances();
+ Assertions.assertEquals(2, allInstances_app2.size());
+ Assertions.assertEquals(0, allInstances_app2.get("app1").size());
+ Assertions.assertEquals(0, allInstances_app2.get("app2").size());
+
+ Map<String, MetadataInfo> revisionToMetadata_app2 =
listener.getRevisionToMetadata();
+ Assertions.assertEquals(0, revisionToMetadata_app2.size());
+
+ Assertions.assertTrue(isEmpty(listener.getAddresses(service1 +
":dubbo", consumerURL)));
+ Assertions.assertTrue(isEmpty(listener.getAddresses(service2+
":dubbo", consumerURL)));
+ Assertions.assertTrue(isEmpty(listener.getAddresses(service3 +
":dubbo", consumerURL)));
+ }
+ }
+
+ // 正常场景。检查instance listener -> service listener(Directory)地址推送流程
+ @Test
+ @Order(5)
+ public void testServiceListenerNotification() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ serviceNames.add("app2");
+ ServiceInstancesChangedListener listener = new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+ NotifyListener demoServiceListener =
Mockito.mock(NotifyListener.class);
+ NotifyListener demoService2Listener =
Mockito.mock(NotifyListener.class);
+ listener.addListenerAndNotify(service1 + ":dubbo",
demoServiceListener);
+ listener.addListenerAndNotify(service2 + ":dubbo",
demoService2Listener);
+
+ try (MockedStatic<MetadataUtils> mockedMetadataUtils =
Mockito.mockStatic(MetadataUtils.class)) {
+ mockedMetadataUtils.when(() ->
MetadataUtils.getMetadataServiceProxy(Mockito.any(),
Mockito.any())).thenReturn(metadataService);
+ // notify app1 instance change
+ ServiceInstancesChangedEvent app1_event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(app1_event);
+
+ // check
+ ArgumentCaptor<List<URL>> captor =
ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoServiceListener,
Mockito.times(1)).notify(captor.capture());
+ List<URL> notifiedUrls = captor.getValue();
+ Assertions.assertEquals(3, notifiedUrls.size());
+ ArgumentCaptor<List<URL>> captor2 =
ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoService2Listener,
Mockito.times(1)).notify(captor2.capture());
+ List<URL> notifiedUrls2 = captor2.getValue();
+ Assertions.assertEquals(0, notifiedUrls2.size());
+
+ // notify app2 instance change
+ ServiceInstancesChangedEvent app2_event = new
ServiceInstancesChangedEvent("app2", app2Instances);
+ listener.onEvent(app2_event);
+
+ // check
+ ArgumentCaptor<List<URL>> app2_captor =
ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoServiceListener,
Mockito.times(2)).notify(app2_captor.capture());
+ List<URL> app2_notifiedUrls = app2_captor.getValue();
+ Assertions.assertEquals(7, app2_notifiedUrls.size());
+ ArgumentCaptor<List<URL>> app2_captor2 =
ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoService2Listener,
Mockito.times(2)).notify(app2_captor2.capture());
+ List<URL> app2_notifiedUrls2 = app2_captor2.getValue();
+ Assertions.assertEquals(4, app2_notifiedUrls2.size());
+ }
+
+ // test service listener still get notified when added after instance
notification.
+ NotifyListener demoService3Listener =
Mockito.mock(NotifyListener.class);
+ listener.addListenerAndNotify(service3 + ":dubbo",
demoService3Listener);
+ Mockito.verify(demoService3Listener,
Mockito.times(1)).notify(Mockito.anyList());
+ }
+
+ // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+ @Test
+ @Order(6)
+ public void testRevisionFailureOnStartup() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ ServiceInstancesChangedListener listener = new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+ try (MockedStatic<MetadataUtils> mockedMetadataUtils =
Mockito.mockStatic(MetadataUtils.class)) {
+ mockedMetadataUtils.when(() ->
MetadataUtils.getMetadataServiceProxy(Mockito.any(),
Mockito.any())).thenReturn(metadataService);
+ // notify app1 instance change
+ ServiceInstancesChangedEvent failed_revision_event = new
ServiceInstancesChangedEvent("app1", app1FailedInstances);
+ listener.onEvent(failed_revision_event);
+
+ List<URL> serviceUrls = listener.getAddresses(service1 + ":dubbo",
consumerURL);
+ List<URL> serviceUrls2 = listener.getAddresses(service2 +
":dubbo", consumerURL);
+
+ Assertions.assertTrue(isEmpty(serviceUrls));
+ Assertions.assertTrue(isEmpty(serviceUrls2));
+
+ Map<String, MetadataInfo> revisionToMetadata =
listener.getRevisionToMetadata();
+ Assertions.assertEquals(2, revisionToMetadata.size());
+ Assertions.assertEquals(metadataInfo_222,
revisionToMetadata.get("222"));
+ Assertions.assertEquals(MetadataInfo.EMPTY,
revisionToMetadata.get("444"));
+ }
+ }
+
+ // revision 异常场景。运行中地址通知,拿不到revision就用老版本revision
+ @Test
+ public void testRevisionFailureOnNotification() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ serviceNames.add("app2");
+ ServiceInstancesChangedListener listener = new
ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+
+ ConcurrentMap tmpProxyMap = MetadataUtils.metadataServiceProxies;
+
+ try (MockedStatic<MetadataUtils> mockedMetadataUtils =
Mockito.mockStatic(MetadataUtils.class)) {
+ mockedMetadataUtils.when(() ->
MetadataUtils.getMetadataServiceProxy(Mockito.any(),
Mockito.any())).thenReturn(metadataService);
+
+ // notify app1 instance change
+ ServiceInstancesChangedEvent event = new
ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(event);
+
+
Mockito.when(metadataService.getMetadataInfo("222")).thenAnswer(new
Answer<MetadataInfo>() {
+ @Override
+ public MetadataInfo answer(InvocationOnMock invocationOnMock)
throws Throwable {
+ if
(Thread.currentThread().getName().contains("Dubbo-metadata-retry")) {
+ return metadataInfo_222;
+ }
+ return null;
+ }
+ });
+//
Mockito.when(metadataService.getMetadataInfo("444")).thenAnswer(new
Answer<MetadataInfo>() {
+// @Override
+// public MetadataInfo answer(InvocationOnMock
invocationOnMock) throws Throwable {
+// if
(Thread.currentThread().getName().contains("Dubbo-metadata-retry")) {
+// return metadataInfo_444;
+// }
+// return null;
+// }
+// });
+
+ ServiceInstancesChangedEvent event2 = new
ServiceInstancesChangedEvent("app2", app1FailedInstances2);
+ listener.onEvent(event2);
+
+ // FIXME, manually mock proxy util, for retry task will work on
another thread which makes MockStatic useless.
+ ConcurrentMap map = Mockito.mock(ConcurrentMap.class);
+ Mockito.doReturn(metadataService).when(map).get(Mockito.any());
+
Mockito.doReturn(metadataService).when(map).computeIfAbsent(Mockito.any(),
Mockito.any());
+ MetadataUtils.metadataServiceProxies = map;
+
+ // event2 did not really take effect
+ Map<String, MetadataInfo> revisionToMetadata =
listener.getRevisionToMetadata();
+ Assertions.assertEquals(2, revisionToMetadata.size());
+ Assertions.assertEquals(metadataInfo_111,
revisionToMetadata.get("111"));
+ Assertions.assertEquals(MetadataInfo.EMPTY,
revisionToMetadata.get("222"));
+
+ Assertions.assertEquals(3, listener.getAddresses(service1 +
":dubbo", consumerURL).size());
+ Assertions.assertTrue(isEmpty(listener.getAddresses(service2 +
":dubbo", consumerURL)));
+
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ // check recovered after retry.
+ Map<String, MetadataInfo> revisionToMetadata_after_retry =
listener.getRevisionToMetadata();
+ Assertions.assertEquals(2, revisionToMetadata_after_retry.size());
+ Assertions.assertEquals(metadataInfo_111,
revisionToMetadata_after_retry.get("111"));
+ Assertions.assertEquals(metadataInfo_222,
revisionToMetadata_after_retry.get("222"));
+
+ List<URL> serviceUrls_after_retry = listener.getAddresses(service1
+ ":dubbo", consumerURL);
+ Assertions.assertEquals(5, serviceUrls_after_retry.size());
+ List<URL> serviceUrls2_after_retry =
listener.getAddresses(service2 + ":dubbo", consumerURL);
+ Assertions.assertEquals(2, serviceUrls2_after_retry.size());
+ } finally {
+ MetadataUtils.metadataServiceProxies = tmpProxyMap;
+ }
+ }
+
+ static List<ServiceInstance> buildInstances(List<Object> rawURls) {
+ List<ServiceInstance> instances = new ArrayList<>();
+
+ for (Object obj : rawURls) {
+ String rawURL = (String)obj;
+ DefaultServiceInstance instance = new DefaultServiceInstance();
+ final URL dubboUrl = URL.valueOf(rawURL);
+ instance = new DefaultServiceInstance();
+ instance.setRawAddress(rawURL);
+ instance.setHost(dubboUrl.getHost());
+ instance.setEnabled(true);
+ instance.setHealthy(true);
+ instance.setPort(dubboUrl.getPort());
+ instance.setRegistryCluster("default");
+
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME,
dubboUrl.getParameter(REVISION_KEY));
+ instance.setMetadata(metadata);
+
+ instances.add(instance);
+ }
+ return instances;
}
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleListenerTest.java
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleListenerTest.java
index 08582c3..5b4a8a3 100644
---
a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleListenerTest.java
+++
b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/migration/MigrationRuleListenerTest.java
@@ -56,12 +56,13 @@ public class MigrationRuleListenerTest {
Mockito.when(consumerURL.getParameter("timestamp")).thenReturn("1");
System.setProperty("dubbo.application.migration.delay", "1000");
- MigrationRuleHandler handler =
Mockito.mock(MigrationRuleHandler.class);
+ MigrationRuleHandler<?> handler =
Mockito.mock(MigrationRuleHandler.class,
Mockito.withSettings().verboseLogging());
MigrationRuleListener migrationRuleListener = new
MigrationRuleListener();
- MigrationInvoker migrationInvoker =
Mockito.mock(MigrationInvoker.class);
+ MigrationInvoker<?> migrationInvoker =
Mockito.mock(MigrationInvoker.class);
migrationRuleListener.getHandlers().put(migrationInvoker, handler);
+ Thread.sleep(5000);
Mockito.verify(handler,
Mockito.timeout(5000)).doMigrate(Mockito.any());
migrationRuleListener.onRefer(null, migrationInvoker, consumerURL,
null);
diff --git
a/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
b/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
index 6c55f8d..be8ab9e 100644
---
a/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
+++
b/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
@@ -150,6 +150,7 @@ public class DNSServiceDiscoveryTest {
dnsServiceDiscovery.addServiceInstancesChangedListener(changedListener);
ArgumentCaptor<ServiceInstancesChangedEvent> argument =
ArgumentCaptor.forClass(ServiceInstancesChangedEvent.class);
+ Thread.sleep(1000);
Mockito.verify(changedListener,
Mockito.timeout(1000)).onEvent(argument.capture());
assertEquals("c",
argument.getValue().getServiceInstances().get(0).getMetadata("a"));
diff --git a/pom.xml b/pom.xml
index ad52d63..395cce0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -193,6 +193,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <version>${mockito_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>cglib</groupId>
<artifactId>cglib-nodep</artifactId>
<version>${cglib_version}</version>