This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 87fe2e7 Submit Dubbo 3 Specify Address module (#102)
87fe2e7 is described below
commit 87fe2e78ce404776cfa2944a0f63d10b3d4f112d
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Mar 10 19:06:19 2022 +0800
Submit Dubbo 3 Specify Address module (#102)
* Submit Dubbo 3 Specify Address module
* add header
* fix version
---
.../pom.xml | 26 +-
.../dubbo/rpc/cluster/specifyaddress/Address.java | 111 +++++++
.../rpc/cluster/specifyaddress/InvokerCache.java | 37 +++
.../specifyaddress/UserSpecifiedAddressRouter.java | 203 +++++++++++++
.../UserSpecifiedAddressRouterFactory.java | 31 ++
.../specifyaddress/UserSpecifiedAddressUtil.java | 41 +++
.../org.apache.dubbo.rpc.cluster.RouterFactory | 1 +
.../rpc/cluster/specifyaddress/DemoService.java | 20 ++
.../cluster/specifyaddress/InvokerCacheTest.java | 34 +++
.../UserSpecifiedAddressRouterFactoryTest.java | 35 +++
.../UserSpecifiedAddressRouterTest.java | 141 +++++++++
.../UserSpecifiedAddressUtilTest.java | 33 +++
.../pom.xml | 26 +-
.../dubbo/rpc/cluster/specifyaddress/Address.java | 111 +++++++
.../DefaultUserSpecifiedServiceAddressBuilder.java | 81 +++++
.../rpc/cluster/specifyaddress/InvokerCache.java | 37 +++
.../specifyaddress/UserSpecifiedAddressRouter.java | 330 +++++++++++++++++++++
.../UserSpecifiedAddressRouterFactory.java | 31 ++
.../specifyaddress/UserSpecifiedAddressUtil.java | 41 +++
.../UserSpecifiedServiceAddressBuilder.java | 33 +++
...bbo.rpc.cluster.router.state.StateRouterFactory | 1 +
...ecifyaddress.UserSpecifiedServiceAddressBuilder | 1 +
...aultUserSpecifiedServiceAddressBuilderTest.java | 171 +++++++++++
.../rpc/cluster/specifyaddress/DemoService.java | 20 ++
.../cluster/specifyaddress/InvokerCacheTest.java | 34 +++
.../UserSpecifiedAddressRouterFactoryTest.java | 38 +++
.../UserSpecifiedAddressRouterTest.java | 232 +++++++++++++++
.../UserSpecifiedAddressUtilTest.java | 33 +++
dubbo-cluster-extensions/pom.xml | 2 +
29 files changed, 1907 insertions(+), 28 deletions(-)
diff --git a/dubbo-cluster-extensions/pom.xml
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/pom.xml
similarity index 69%
copy from dubbo-cluster-extensions/pom.xml
copy to dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/pom.xml
index 4bfd100..10af7a8 100644
--- a/dubbo-cluster-extensions/pom.xml
+++ b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/pom.xml
@@ -19,24 +19,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>dubbo-cluster-extensions</artifactId>
<groupId>org.apache.dubbo.extensions</groupId>
- <artifactId>dubbo-extensions-dependencies-bom</artifactId>
-
<relativePath>../dubbo-extensions-dependencies-bom/pom.xml</relativePath>
- <version>1.0.0</version>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dubbo-cluster-extensions</artifactId>
- <version>${revision}</version>
- <packaging>pom</packaging>
-
- <properties>
- <revision>3.0.0-SNAPSHOT</revision>
- </properties>
- <modules>
- <module>dubbo-cluster-broadcast-1</module>
- <module>dubbo-cluster-loadbalance-peakewma</module>
- </modules>
+ <artifactId>dubbo-cluster-specify-address-dubbo2</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ <version>2.7.14</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/Address.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/Address.java
new file mode 100644
index 0000000..b4268bf
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/Address.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class Address implements Serializable {
+ // ip - priority: 3
+ private String ip;
+
+ // ip+port - priority: 2
+ private int port;
+
+ // address - priority: 1
+ private URL urlAddress;
+ private boolean needToCreate = false;
+
+ public Address(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
+ this.urlAddress = null;
+ }
+
+ public Address(String ip, int port, boolean needToCreate) {
+ this.ip = ip;
+ this.port = port;
+ this.needToCreate = needToCreate;
+ }
+
+ public Address(URL address) {
+ this.ip = null;
+ this.port = 0;
+ this.urlAddress = address;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public URL getUrlAddress() {
+ return urlAddress;
+ }
+
+ public void setUrlAddress(URL urlAddress) {
+ this.urlAddress = urlAddress;
+ }
+
+ public boolean isNeedToCreate() {
+ return needToCreate;
+ }
+
+ public void setNeedToCreate(boolean needToCreate) {
+ this.needToCreate = needToCreate;
+ }
+
+ @Override
+ public String toString() {
+ return "Address{" +
+ "ip='" + ip + '\'' +
+ ", port=" + port +
+ ", address=" + urlAddress +
+ ", needToCreate=" + needToCreate +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Address address = (Address) o;
+ return port == address.port && needToCreate == address.needToCreate &&
Objects.equals(ip, address.ip) && Objects.equals(urlAddress,
address.urlAddress);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, port, urlAddress, needToCreate);
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCache.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCache.java
new file mode 100644
index 0000000..52aff89
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCache.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.rpc.Invoker;
+
+public class InvokerCache<T> {
+ private long lastAccess = System.currentTimeMillis();
+ private final Invoker<T> invoker;
+
+ public InvokerCache(Invoker<T> invoker) {
+ this.invoker = invoker;
+ }
+
+ public long getLastAccess() {
+ return lastAccess;
+ }
+
+ public Invoker<T> getInvoker() {
+ lastAccess = System.currentTimeMillis();
+ return invoker;
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
new file mode 100644
index 0000000..7c8bd6e
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.router.AbstractRouter;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class UserSpecifiedAddressRouter extends AbstractRouter {
+ private final static Logger logger =
LoggerFactory.getLogger(UserSpecifiedAddressRouter.class);
+ // protected for ut purpose
+ protected static int EXPIRE_TIME = 10 * 60 * 1000;
+
+ private volatile List<Invoker<?>> invokers = Collections.emptyList();
+ private volatile Map<String, Invoker<?>> ip2Invoker;
+ private volatile Map<String, Invoker<?>> address2Invoker;
+
+ private final Lock cacheLock = new ReentrantLock();
+
+ public UserSpecifiedAddressRouter(URL referenceUrl) {
+ super(referenceUrl);
+ }
+
+ @Override
+ public <T> void notify(List<Invoker<T>> invokers) {
+ this.invokers = (List) invokers;
+ // do not build cache until first Specify Invoke happened
+ if (ip2Invoker != null) {
+ ip2Invoker = processIp((List) invokers);
+ address2Invoker = processAddress((List) invokers);
+ }
+ }
+
+ @Override
+ public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url,
Invocation invocation) throws RpcException {
+ Address address = UserSpecifiedAddressUtil.getAddress();
+
+ // 1. check if set address in ThreadLocal
+ if (address == null) {
+ return invokers;
+ }
+
+ List<Invoker<T>> result = new LinkedList<>();
+
+ // 2. check if set address url
+ if (address.getUrlAddress() != null) {
+ Invoker<?> invoker = getInvokerByURL(address, invocation);
+ result.add((Invoker) invoker);
+ return result;
+ }
+
+ // 3. check if set ip and port
+ if (StringUtils.isNotEmpty(address.getIp())) {
+ Invoker<?> invoker = getInvokerByIp(address, invocation);
+ result.add((Invoker) invoker);
+ return result;
+ }
+
+ return invokers;
+ }
+
+ private Invoker<?> getInvokerByURL(Address address, Invocation invocation)
{
+ tryLoadSpecifiedMap();
+
+ // try to find in directory
+ URL urlAddress = address.getUrlAddress();
+ String targetAddress = urlAddress.getHost() + ":" +
urlAddress.getPort();
+ Invoker<?> invoker = address2Invoker.get(targetAddress);
+ if (invoker != null) {
+ AtomicBoolean match = new AtomicBoolean(true);
+ if (StringUtils.isNotEmpty(urlAddress.getProtocol())) {
+
match.set(invoker.getUrl().getProtocol().equals(urlAddress.getProtocol()));
+ }
+ if (match.get()) {
+ urlAddress.getParameters().forEach((k, v) -> {
+ if (match.get()) {
+ match.set(v.equals(invoker.getUrl().getParameter(k)));
+ }
+ });
+ }
+ if (match.get()) {
+ return invoker;
+ }
+ }
+
+ // create new one
+ throw new RpcException("User specified server address not support
refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use
dubbo-cluster-specify-address-dubbo3.");
+ }
+
+ public Invoker<?> getInvokerByIp(Address address, Invocation invocation) {
+ tryLoadSpecifiedMap();
+
+ String ip = address.getIp();
+ int port = address.getPort();
+
+ Invoker<?> targetInvoker;
+ if (port != 0) {
+ targetInvoker = address2Invoker.get(ip + ":" + port);
+ if (targetInvoker != null) {
+ return targetInvoker;
+ }
+ } else {
+ targetInvoker = ip2Invoker.get(ip);
+ if (targetInvoker != null) {
+ return targetInvoker;
+ }
+ }
+
+ if (!address.isNeedToCreate()) {
+ throwException(invocation, address);
+ }
+
+ throw new RpcException("User specified server address not support
refer new url in Dubbo 2.x. Please upgrade to Dubbo 3.x and use
dubbo-cluster-specify-address-dubbo3.");
+ }
+
+ private void throwException(Invocation invocation, Address address) {
+ throw new RpcException("user specified server address : [" + address +
"] is not a valid provider for service: ["
+ + getUrl().getServiceKey() + "]");
+ }
+
+
+ private Map<String, Invoker<?>> processIp(List<Invoker<?>> invokerList) {
+ Map<String, Invoker<?>> ip2Invoker = new HashMap<>();
+ for (Invoker<?> invoker : invokerList) {
+ ip2Invoker.put(invoker.getUrl().getHost(), invoker);
+ }
+ return Collections.unmodifiableMap(ip2Invoker);
+ }
+
+ private Map<String, Invoker<?>> processAddress(List<Invoker<?>> addresses)
{
+ Map<String, Invoker<?>> address2Invoker = new HashMap<>();
+ for (Invoker<?> invoker : addresses) {
+ address2Invoker.put(invoker.getUrl().getHost() + ":" +
invoker.getUrl().getPort(), invoker);
+ }
+ return Collections.unmodifiableMap(address2Invoker);
+ }
+
+ // For ut only
+ @Deprecated
+ protected Map<String, Invoker<?>> getIp2Invoker() {
+ return ip2Invoker;
+ }
+
+ // For ut only
+ @Deprecated
+ protected Map<String, Invoker<?>> getAddress2Invoker() {
+ return address2Invoker;
+ }
+
+ // For ut only
+ @Deprecated
+ protected List<Invoker<?>> getInvokers() {
+ return invokers;
+ }
+
+ private void tryLoadSpecifiedMap() {
+ if (ip2Invoker != null) {
+ return;
+ }
+ synchronized (this) {
+ if (ip2Invoker != null) {
+ return;
+ }
+ List<Invoker<?>> invokers = this.invokers;
+ if (CollectionUtils.isEmpty(invokers)) {
+ address2Invoker = Collections.unmodifiableMap(new HashMap<>());
+ ip2Invoker = Collections.unmodifiableMap(new HashMap<>());
+ return;
+ }
+ address2Invoker = processAddress(invokers);
+ ip2Invoker = processIp(invokers);
+ }
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactory.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactory.java
new file mode 100644
index 0000000..38c9243
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.cluster.Router;
+import org.apache.dubbo.rpc.cluster.RouterFactory;
+
+@Activate(order = 10)
+public class UserSpecifiedAddressRouterFactory implements RouterFactory {
+
+ @Override
+ public Router getRouter(URL url) {
+ return new UserSpecifiedAddressRouter(url);
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtil.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtil.java
new file mode 100644
index 0000000..61a6ecb
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.threadlocal.InternalThreadLocal;
+
+public class UserSpecifiedAddressUtil {
+ private final static InternalThreadLocal<Address> ADDRESS = new
InternalThreadLocal<>();
+
+ /**
+ * Set specified address to next invoke
+ *
+ * @param address specified address
+ */
+ public static void setAddress(Address address) {
+ ADDRESS.set(address);
+ }
+
+ public static Address getAddress() {
+ try {
+ return ADDRESS.get();
+ } finally {
+ // work once
+ ADDRESS.remove();
+ }
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.RouterFactory
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.RouterFactory
new file mode 100644
index 0000000..e2f71d9
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.RouterFactory
@@ -0,0 +1 @@
+user-specified-address=org.apache.dubbo.rpc.cluster.specifyaddress.UserSpecifiedAddressRouterFactory
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DemoService.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DemoService.java
new file mode 100644
index 0000000..a315edb
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DemoService.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+public interface DemoService {
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCacheTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCacheTest.java
new file mode 100644
index 0000000..47fc38d
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCacheTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.rpc.Invoker;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class InvokerCacheTest {
+ @Test
+ public void test() throws InterruptedException {
+ InvokerCache<Object> cache = new
InvokerCache<>(Mockito.mock(Invoker.class));
+ long originTime = cache.getLastAccess();
+ Thread.sleep(5);
+ cache.getInvoker();
+ Assertions.assertNotEquals(originTime, cache.getLastAccess());
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactoryTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactoryTest.java
new file mode 100644
index 0000000..f3cbc88
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactoryTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.rpc.cluster.RouterFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class UserSpecifiedAddressRouterFactoryTest {
+ @Test
+ public void test() {
+
+ RouterFactory stateRouterFactory =
ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension("user-specified-address");
+ Assertions.assertEquals(UserSpecifiedAddressRouterFactory.class,
stateRouterFactory.getClass());
+
+ stateRouterFactory.getRouter(URL.valueOf("127.0.0.1"));
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
new file mode 100644
index 0000000..372eb0f
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+public class UserSpecifiedAddressRouterTest {
+ private ApplicationModel applicationModel;
+ private URL consumerUrl;
+
+ @BeforeEach
+ public void setup() {
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value").addParameter("check", "false")
+ .addParameter("version", "1.0.0").addParameter("group", "Dubbo");
+ }
+
+ @Test
+ public void testNotify() {
+ UserSpecifiedAddressRouter userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter(consumerUrl);
+ Assertions.assertEquals(Collections.emptyList(),
userSpecifiedAddressRouter.getInvokers());
+ Assertions.assertNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNull(userSpecifiedAddressRouter.getIp2Invoker());
+ userSpecifiedAddressRouter.notify(Collections.emptyList());
+ Assertions.assertEquals(Collections.emptyList(),
userSpecifiedAddressRouter.getInvokers());
+ Assertions.assertNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNull(userSpecifiedAddressRouter.getIp2Invoker());
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.1", 0));
+
+ // no address
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+
+
Assertions.assertNotNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNotNull(userSpecifiedAddressRouter.getIp2Invoker());
+
+ userSpecifiedAddressRouter.notify(Collections.emptyList());
+ Assertions.assertEquals(Collections.emptyList(),
userSpecifiedAddressRouter.getInvokers());
+
Assertions.assertNotNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNotNull(userSpecifiedAddressRouter.getIp2Invoker());
+ }
+
+ @Test
+ public void testGetInvokerByURL() {
+ UserSpecifiedAddressRouter userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter(consumerUrl);
+
+ Assertions.assertEquals(Collections.emptyList(),
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880")));
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+
+ Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
+
Mockito.when(mockInvoker.getUrl()).thenReturn(URL.valueOf("simple://127.0.0.1:20880?Test1=Value"));
+
+ userSpecifiedAddressRouter.notify(new
LinkedList<>(Collections.singletonList(mockInvoker)));
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880")));
+ List<Invoker<Object>> invokers = userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ userSpecifiedAddressRouter.notify(new
LinkedList<>(Collections.singletonList(mockInvoker)));
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880?Test1=Value")));
+ invokers = userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ userSpecifiedAddressRouter.notify(new
LinkedList<>(Collections.singletonList(mockInvoker)));
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("simple://127.0.0.1:20880")));
+ invokers = userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880?Test1=Value&Test2=Value&Test3=Value")));
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ }
+
+ @Test
+ public void testGetInvokerByIp() {
+ UserSpecifiedAddressRouter userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter(consumerUrl);
+
+ Assertions.assertEquals(Collections.emptyList(),
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+
+ Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
+ Mockito.when(mockInvoker.getUrl()).thenReturn(consumerUrl);
+
+ userSpecifiedAddressRouter.notify(new
LinkedList<>(Collections.singletonList(mockInvoker)));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 0));
+ List<Invoker<Object>> invokers = userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20880));
+ invokers = userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class));
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770));
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class)));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.3", 20880));
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.route(new
LinkedList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class)));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770,
true));
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.route(Collections.emptyList(),
consumerUrl, Mockito.mock(Invocation.class)));
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtilTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtilTest.java
new file mode 100644
index 0000000..0dc1c16
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo2/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtilTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class UserSpecifiedAddressUtilTest {
+ @Test
+ public void test() {
+ Assertions.assertNull(UserSpecifiedAddressUtil.getAddress());
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.1", 0));
+ Assertions.assertEquals(new Address("127.0.0.1", 0),
UserSpecifiedAddressUtil.getAddress());
+ Assertions.assertNull(UserSpecifiedAddressUtil.getAddress());
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.1", 12345));
+ Assertions.assertNotEquals(new Address("127.0.0.1", 0),
UserSpecifiedAddressUtil.getAddress());
+ Assertions.assertNull(UserSpecifiedAddressUtil.getAddress());
+ }
+}
diff --git a/dubbo-cluster-extensions/pom.xml
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/pom.xml
similarity index 69%
copy from dubbo-cluster-extensions/pom.xml
copy to dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/pom.xml
index 4bfd100..dd8a209 100644
--- a/dubbo-cluster-extensions/pom.xml
+++ b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/pom.xml
@@ -19,24 +19,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>dubbo-cluster-extensions</artifactId>
<groupId>org.apache.dubbo.extensions</groupId>
- <artifactId>dubbo-extensions-dependencies-bom</artifactId>
-
<relativePath>../dubbo-extensions-dependencies-bom/pom.xml</relativePath>
- <version>1.0.0</version>
+ <version>${revision}</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dubbo-cluster-extensions</artifactId>
- <version>${revision}</version>
- <packaging>pom</packaging>
-
- <properties>
- <revision>3.0.0-SNAPSHOT</revision>
- </properties>
- <modules>
- <module>dubbo-cluster-broadcast-1</module>
- <module>dubbo-cluster-loadbalance-peakewma</module>
- </modules>
+ <artifactId>dubbo-cluster-specify-address-dubbo3</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ <version>3.0.6</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/Address.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/Address.java
new file mode 100644
index 0000000..b4268bf
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/Address.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class Address implements Serializable {
+ // ip - priority: 3
+ private String ip;
+
+ // ip+port - priority: 2
+ private int port;
+
+ // address - priority: 1
+ private URL urlAddress;
+ private boolean needToCreate = false;
+
+ public Address(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
+ this.urlAddress = null;
+ }
+
+ public Address(String ip, int port, boolean needToCreate) {
+ this.ip = ip;
+ this.port = port;
+ this.needToCreate = needToCreate;
+ }
+
+ public Address(URL address) {
+ this.ip = null;
+ this.port = 0;
+ this.urlAddress = address;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public URL getUrlAddress() {
+ return urlAddress;
+ }
+
+ public void setUrlAddress(URL urlAddress) {
+ this.urlAddress = urlAddress;
+ }
+
+ public boolean isNeedToCreate() {
+ return needToCreate;
+ }
+
+ public void setNeedToCreate(boolean needToCreate) {
+ this.needToCreate = needToCreate;
+ }
+
+ @Override
+ public String toString() {
+ return "Address{" +
+ "ip='" + ip + '\'' +
+ ", port=" + port +
+ ", address=" + urlAddress +
+ ", needToCreate=" + needToCreate +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Address address = (Address) o;
+ return port == address.port && needToCreate == address.needToCreate &&
Objects.equals(ip, address.ip) && Objects.equals(urlAddress,
address.urlAddress);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, port, urlAddress, needToCreate);
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/DefaultUserSpecifiedServiceAddressBuilder.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/DefaultUserSpecifiedServiceAddressBuilder.java
new file mode 100644
index 0000000..65f6d26
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/DefaultUserSpecifiedServiceAddressBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.url.component.DubboServiceAddressURL;
+import org.apache.dubbo.common.url.component.PathURLAddress;
+import org.apache.dubbo.common.url.component.URLParam;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+public class DefaultUserSpecifiedServiceAddressBuilder implements
UserSpecifiedServiceAddressBuilder {
+ public final static String NAME = "default";
+
+ private final ExtensionLoader<Protocol> protocolExtensionLoader;
+
+ public DefaultUserSpecifiedServiceAddressBuilder(ApplicationModel
applicationModel) {
+ this.protocolExtensionLoader =
applicationModel.getExtensionLoader(Protocol.class);
+ }
+
+ @Override
+ public <T> URL buildAddress(List<Invoker<T>> invokers, Address address,
Invocation invocation, URL consumerUrl) {
+ if (!invokers.isEmpty()) {
+ URL template = invokers.iterator().next().getUrl();
+ template = template.setHost(address.getIp());
+ if (address.getPort() != 0) {
+ template = template.setPort(address.getPort());
+ }
+ return template;
+ } else {
+ String ip = address.getIp();
+ int port = address.getPort();
+ String protocol = consumerUrl.getParameter(PROTOCOL_KEY, DUBBO);
+ if (port == 0) {
+ port =
protocolExtensionLoader.getExtension(protocol).getDefaultPort();
+ }
+ return new DubboServiceAddressURL(
+ new PathURLAddress(protocol, null, null,
consumerUrl.getPath(), ip, port),
+ URLParam.parse(""), consumerUrl, null);
+ }
+ }
+
+ @Override
+ public <T> URL rebuildAddress(List<Invoker<T>> invokers, Address address,
Invocation invocation, URL consumerUrl) {
+ URL url = address.getUrlAddress();
+ Map<String, String> parameters = new HashMap<>(url.getParameters());
+ parameters.put(VERSION_KEY, consumerUrl.getVersion());
+ parameters.put(GROUP_KEY, consumerUrl.getGroup());
+ String protocol = StringUtils.isEmpty(url.getProtocol()) ?
consumerUrl.getParameter(PROTOCOL_KEY, DUBBO) : url.getProtocol();
+ return new DubboServiceAddressURL(
+ new PathURLAddress(protocol, null, null, consumerUrl.getPath(),
url.getHost(), url.getPort()),
+ URLParam.parse(parameters), consumerUrl, null);
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCache.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCache.java
new file mode 100644
index 0000000..52aff89
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCache.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.rpc.Invoker;
+
+public class InvokerCache<T> {
+ private long lastAccess = System.currentTimeMillis();
+ private final Invoker<T> invoker;
+
+ public InvokerCache(Invoker<T> invoker) {
+ this.invoker = invoker;
+ }
+
+ public long getLastAccess() {
+ return lastAccess;
+ }
+
+ public Invoker<T> getInvoker() {
+ lastAccess = System.currentTimeMillis();
+ return invoker;
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
new file mode 100644
index 0000000..a8faeb2
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouter.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.Holder;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.router.RouterSnapshotNode;
+import org.apache.dubbo.rpc.cluster.router.state.AbstractStateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class UserSpecifiedAddressRouter<T> extends AbstractStateRouter<T> {
+ private final static Logger logger =
LoggerFactory.getLogger(UserSpecifiedAddressRouter.class);
+ // protected for ut purpose
+ protected static int EXPIRE_TIME = 10 * 60 * 1000;
+ private final static String USER_SPECIFIED_SERVICE_ADDRESS_BUILDER_KEY =
"userSpecifiedServiceAddressBuilder";
+
+ private volatile BitList<Invoker<T>> invokers = BitList.emptyList();
+ private volatile Map<String, Invoker<T>> ip2Invoker;
+ private volatile Map<String, Invoker<T>> address2Invoker;
+
+ private final Lock cacheLock = new ReentrantLock();
+ private final Map<URL, InvokerCache<T>> newInvokerCache = new
LinkedHashMap<>(16, 0.75f, true);
+
+ private final UserSpecifiedServiceAddressBuilder
userSpecifiedServiceAddressBuilder;
+
+ private final Protocol protocol;
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final AtomicBoolean launchRemovalTask = new AtomicBoolean(false);
+ private volatile ScheduledFuture<?> removalFuture;
+
+ public UserSpecifiedAddressRouter(URL referenceUrl) {
+ super(referenceUrl);
+ this.scheduledExecutorService =
referenceUrl.getScopeModel().getDefaultExtension(ExecutorRepository.class).nextScheduledExecutor();
+ this.protocol =
referenceUrl.getOrDefaultFrameworkModel().getExtensionLoader(Protocol.class).getAdaptiveExtension();
+ this.userSpecifiedServiceAddressBuilder =
referenceUrl.getScopeModel().getExtensionLoader(UserSpecifiedServiceAddressBuilder.class)
+
.getExtension(referenceUrl.getParameter(USER_SPECIFIED_SERVICE_ADDRESS_BUILDER_KEY,
DefaultUserSpecifiedServiceAddressBuilder.NAME));
+ }
+
+ @Override
+ public void notify(BitList<Invoker<T>> invokers) {
+ this.invokers = invokers;
+ // do not build cache until first Specify Invoke happened
+ if (ip2Invoker != null) {
+ ip2Invoker = processIp(invokers);
+ address2Invoker = processAddress(invokers);
+ }
+ }
+
+ @Override
+ protected BitList<Invoker<T>> doRoute(BitList<Invoker<T>> invokers, URL
url, Invocation invocation,
+ boolean needToPrintMessage,
Holder<RouterSnapshotNode<T>> nodeHolder,
+ Holder<String> messageHolder) throws
RpcException {
+ Address address = UserSpecifiedAddressUtil.getAddress();
+
+ // 1. check if set address in ThreadLocal
+ if (address == null) {
+ if (needToPrintMessage) {
+ messageHolder.set("No address specified, continue.");
+ }
+ return continueRoute(invokers, url, invocation,
needToPrintMessage, nodeHolder);
+ }
+
+ BitList<Invoker<T>> result = new BitList<>(invokers, true);
+
+ // 2. check if set address url
+ if (address.getUrlAddress() != null) {
+ Invoker<T> invoker = getInvokerByURL(address, invocation);
+ result.add(invoker);
+ if (needToPrintMessage) {
+ messageHolder.set("URL Address has been set. URL Address: " +
address.getUrlAddress());
+ }
+ return result;
+ }
+
+ // 3. check if set ip and port
+ if (StringUtils.isNotEmpty(address.getIp())) {
+ Invoker<T> invoker = getInvokerByIp(address, invocation);
+ if (invoker != null) {
+ result.add(invoker);
+ if (needToPrintMessage) {
+ messageHolder.set("Target Ip has been set and address can
be found in directory. Target Ip: " + address.getIp() + " Port: " +
address.getPort());
+ }
+ return result;
+ } // target ip is not contains in directory
+
+ if (address.isNeedToCreate()) {
+ invoker = createInvoker(address, invocation);
+ result.add(invoker);
+ if (needToPrintMessage) {
+ messageHolder.set("Target Ip has been set and address
cannot be found in directory, build new one. Target Ip: " + address.getIp() + "
Port: " + address.getPort());
+ }
+ return result;
+ }
+ }
+
+ if (needToPrintMessage) {
+ messageHolder.set("Target Address has not been set.");
+ }
+ return continueRoute(invokers, url, invocation, needToPrintMessage,
nodeHolder);
+ }
+
+ @Override
+ protected boolean supportContinueRoute() {
+ return true;
+ }
+
+ private Invoker<T> getInvokerByURL(Address address, Invocation invocation)
{
+ tryLoadSpecifiedMap();
+
+ // try to find in directory
+ URL urlAddress = address.getUrlAddress();
+ String targetAddress = urlAddress.getHost() + ":" +
urlAddress.getPort();
+ Invoker<T> invoker = address2Invoker.get(targetAddress);
+ if (invoker != null) {
+ AtomicBoolean match = new AtomicBoolean(true);
+ if (StringUtils.isNotEmpty(urlAddress.getProtocol())) {
+
match.set(invoker.getUrl().getProtocol().equals(urlAddress.getProtocol()));
+ }
+ if (match.get()) {
+ urlAddress.getParameters().forEach((k, v) -> {
+ if (match.get()) {
+ match.set(v.equals(invoker.getUrl().getParameter(k)));
+ }
+ });
+ }
+ if (match.get()) {
+ return invoker;
+ }
+ }
+
+ // create new one
+ URL url = userSpecifiedServiceAddressBuilder.rebuildAddress(invokers,
address, invocation, getUrl());
+ return getOrBuildInvokerCache(url);
+ }
+
+ private Invoker<T> getOrBuildInvokerCache(URL url) {
+ logger.info("Unable to find a proper invoker from directory. Try to
create new invoker. New URL: " + url);
+
+ InvokerCache<T> cache;
+ cacheLock.lock();
+ try {
+ cache = newInvokerCache.get(url);
+ } finally {
+ cacheLock.unlock();
+ }
+ if (cache == null) {
+ Invoker<T> invoker = refer(url);
+ cacheLock.lock();
+ try {
+ cache = newInvokerCache.get(url);
+ if (cache == null) {
+ cache = new InvokerCache<>(invoker);
+ newInvokerCache.put(url, cache);
+ if (launchRemovalTask.compareAndSet(false, true)) {
+ removalFuture =
scheduledExecutorService.scheduleAtFixedRate(new RemovalTask(), EXPIRE_TIME /
2, EXPIRE_TIME / 2, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ invoker.destroy();
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+ return cache.getInvoker();
+ }
+
+ public Invoker<T> getInvokerByIp(Address address, Invocation invocation) {
+ tryLoadSpecifiedMap();
+
+ String ip = address.getIp();
+ int port = address.getPort();
+
+ Invoker<T> targetInvoker;
+ if (port != 0) {
+ targetInvoker = address2Invoker.get(ip + ":" + port);
+ if (targetInvoker != null) {
+ return targetInvoker;
+ }
+ } else {
+ targetInvoker = ip2Invoker.get(ip);
+ if (targetInvoker != null) {
+ return targetInvoker;
+ }
+ }
+
+ if (!address.isNeedToCreate()) {
+ throwException(invocation, address);
+ }
+
+ return null;
+ }
+
+ public Invoker<T> createInvoker(Address address, Invocation invocation) {
+ return
getOrBuildInvokerCache(userSpecifiedServiceAddressBuilder.buildAddress(invokers,
address, invocation, getUrl()));
+ }
+
+ private Invoker<T> refer(URL url) {
+ return (Invoker<T>)
protocol.refer(getUrl().getServiceModel().getServiceInterfaceClass(), url);
+ }
+
+ private void throwException(Invocation invocation, Address address) {
+ throw new RpcException("user specified server address : [" + address +
"] is not a valid provider for service: ["
+ + getUrl().getServiceKey() + "]");
+ }
+
+ private Map<String, Invoker<T>> processIp(List<Invoker<T>> invokerList) {
+ Map<String, Invoker<T>> ip2Invoker = new HashMap<>();
+ for (Invoker<T> invoker : invokerList) {
+ ip2Invoker.put(invoker.getUrl().getHost(), invoker);
+ }
+ return Collections.unmodifiableMap(ip2Invoker);
+ }
+
+ private Map<String, Invoker<T>> processAddress(List<Invoker<T>> addresses)
{
+ Map<String, Invoker<T>> address2Invoker = new HashMap<>();
+ for (Invoker<T> invoker : addresses) {
+ address2Invoker.put(invoker.getUrl().getHost() + ":" +
invoker.getUrl().getPort(), invoker);
+ }
+ return Collections.unmodifiableMap(address2Invoker);
+ }
+
+ // For ut only
+ @Deprecated
+ protected Map<String, Invoker<T>> getIp2Invoker() {
+ return ip2Invoker;
+ }
+
+ // For ut only
+ @Deprecated
+ protected Map<String, Invoker<T>> getAddress2Invoker() {
+ return address2Invoker;
+ }
+
+ // For ut only
+ @Deprecated
+ protected BitList<Invoker<T>> getInvokers() {
+ return invokers;
+ }
+
+ // For ut only
+ @Deprecated
+ protected Map<URL, InvokerCache<T>> getNewInvokerCache() {
+ return newInvokerCache;
+ }
+
+ private void tryLoadSpecifiedMap() {
+ if (ip2Invoker != null) {
+ return;
+ }
+ synchronized (this) {
+ if (ip2Invoker != null) {
+ return;
+ }
+ BitList<Invoker<T>> invokers = this.invokers;
+ if (CollectionUtils.isEmpty(invokers)) {
+ address2Invoker = Collections.unmodifiableMap(new HashMap<>());
+ ip2Invoker = Collections.unmodifiableMap(new HashMap<>());
+ return;
+ }
+ address2Invoker = processAddress(invokers);
+ ip2Invoker = processIp(invokers);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (removalFuture != null) {
+ removalFuture.cancel(false);
+ }
+ }
+
+ private class RemovalTask implements Runnable {
+ @Override
+ public void run() {
+ cacheLock.lock();
+ try {
+ if (newInvokerCache.size() > 0) {
+ Iterator<Map.Entry<URL, InvokerCache<T>>> iterator =
newInvokerCache.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<URL, InvokerCache<T>> entry =
iterator.next();
+ if (System.currentTimeMillis() -
entry.getValue().getLastAccess() > EXPIRE_TIME) {
+ iterator.remove();
+ entry.getValue().getInvoker().destroy();
+ } else {
+ break;
+ }
+ }
+ }
+ } finally {
+ cacheLock.unlock();
+ }
+ }
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactory.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactory.java
new file mode 100644
index 0000000..f74ff71
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouter;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouterFactory;
+
+@Activate(order = 10)
+public class UserSpecifiedAddressRouterFactory implements StateRouterFactory {
+
+ @Override
+ public <T> StateRouter<T> getRouter(Class<T> interfaceClass, URL url) {
+ return new UserSpecifiedAddressRouter<>(url);
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtil.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtil.java
new file mode 100644
index 0000000..61a6ecb
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.threadlocal.InternalThreadLocal;
+
+public class UserSpecifiedAddressUtil {
+ private final static InternalThreadLocal<Address> ADDRESS = new
InternalThreadLocal<>();
+
+ /**
+ * Set specified address to next invoke
+ *
+ * @param address specified address
+ */
+ public static void setAddress(Address address) {
+ ADDRESS.set(address);
+ }
+
+ public static Address getAddress() {
+ try {
+ return ADDRESS.get();
+ } finally {
+ // work once
+ ADDRESS.remove();
+ }
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedServiceAddressBuilder.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedServiceAddressBuilder.java
new file mode 100644
index 0000000..49e901e
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedServiceAddressBuilder.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionScope;
+import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+
+import java.util.List;
+
+@SPI(scope = ExtensionScope.APPLICATION)
+public interface UserSpecifiedServiceAddressBuilder {
+
+ <T> URL buildAddress(List<Invoker<T>> invokers, Address address,
Invocation invocation, URL consumerUrl);
+
+ <T> URL rebuildAddress(List<Invoker<T>> invokers, Address address,
Invocation invocation, URL consumerUrl);
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.router.state.StateRouterFactory
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.router.state.StateRouterFactory
new file mode 100644
index 0000000..e2f71d9
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.router.state.StateRouterFactory
@@ -0,0 +1 @@
+user-specified-address=org.apache.dubbo.rpc.cluster.specifyaddress.UserSpecifiedAddressRouterFactory
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.specifyaddress.UserSpecifiedServiceAddressBuilder
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.specifyaddress.UserSpecifiedServiceAddressBuilder
new file mode 100644
index 0000000..756631b
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.cluster.specifyaddress.UserSpecifiedServiceAddressBuilder
@@ -0,0 +1 @@
+default=org.apache.dubbo.rpc.cluster.specifyaddress.DefaultUserSpecifiedServiceAddressBuilder
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DefaultUserSpecifiedServiceAddressBuilderTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DefaultUserSpecifiedServiceAddressBuilderTest.java
new file mode 100644
index 0000000..4d46f52
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DefaultUserSpecifiedServiceAddressBuilderTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ServiceModel;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class DefaultUserSpecifiedServiceAddressBuilderTest {
+ @Test
+ public void testBuild() {
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+
+ DefaultUserSpecifiedServiceAddressBuilder
defaultUserSpecifiedServiceAddressBuilder = new
DefaultUserSpecifiedServiceAddressBuilder(applicationModel);
+
+ Address address = new Address("127.0.0.1", 0);
+ ServiceModel serviceModel = Mockito.mock(ServiceModel.class);
+ URL consumerUrl = URL.valueOf("").addParameter("Test",
"Value").setScopeModel(applicationModel).setServiceModel(serviceModel);
+ URL url =
defaultUserSpecifiedServiceAddressBuilder.buildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(20880, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ address = new Address("127.0.0.1", 20770);
+ url =
defaultUserSpecifiedServiceAddressBuilder.buildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(20770, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ URL invokerUrl = URL.valueOf("127.0.0.2:20660").addParameter("Test1",
"Value1").setScopeModel(applicationModel).setServiceModel(serviceModel);
+ Invoker invoker = Mockito.mock(Invoker.class);
+ Mockito.when(invoker.getUrl()).thenReturn(invokerUrl);
+
+ address = new Address("127.0.0.1", 20770);
+ url =
defaultUserSpecifiedServiceAddressBuilder.buildAddress(Collections.singletonList(invoker),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(20770, url.getPort());
+ Assertions.assertEquals("Value1", url.getParameter("Test1"));
+ Assertions.assertNull(url.getParameter("Test"));
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ address = new Address("127.0.0.1", 0);
+ url =
defaultUserSpecifiedServiceAddressBuilder.buildAddress(Collections.singletonList(invoker),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(20660, url.getPort());
+ Assertions.assertEquals("Value1", url.getParameter("Test1"));
+ Assertions.assertNull(url.getParameter("Test"));
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ applicationModel.destroy();
+ }
+
+ @Test
+ public void testReBuild() {
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+
+ DefaultUserSpecifiedServiceAddressBuilder
defaultUserSpecifiedServiceAddressBuilder = new
DefaultUserSpecifiedServiceAddressBuilder(applicationModel);
+
+ Address address = new
Address(URL.valueOf("127.0.0.1:12345?Test=Value"));
+ ServiceModel serviceModel = Mockito.mock(ServiceModel.class);
+ URL consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value")
+ .addParameter("version", "1.0.0").addParameter("group", "Dubbo")
+ .setScopeModel(applicationModel).setServiceModel(serviceModel);
+ URL url =
defaultUserSpecifiedServiceAddressBuilder.rebuildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(12345, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getVersion(), url.getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(), url.getGroup());
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ address = new Address(URL.valueOf("127.0.0.1:12345?Test=Value1"));
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value")
+ .addParameter("version", "1.0.0").addParameter("group", "Dubbo")
+ .setScopeModel(applicationModel).setServiceModel(serviceModel);
+ url =
defaultUserSpecifiedServiceAddressBuilder.rebuildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(12345, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getVersion(), url.getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(), url.getGroup());
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ address = new Address(URL.valueOf("127.0.0.1:12345?Test1=Value1"));
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value")
+ .addParameter("version", "1.0.0").addParameter("group", "Dubbo")
+ .setScopeModel(applicationModel).setServiceModel(serviceModel);
+ url =
defaultUserSpecifiedServiceAddressBuilder.rebuildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(12345, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals("Value1", url.getParameter("Test1"));
+ Assertions.assertEquals(consumerUrl.getVersion(), url.getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(), url.getGroup());
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ address = new Address(URL.valueOf("127.0.0.1:12345?Test1=Value1"));
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value")
+ .addParameter("version", "1.0.0")
+ .setScopeModel(applicationModel).setServiceModel(serviceModel);
+ url =
defaultUserSpecifiedServiceAddressBuilder.rebuildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(12345, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals("Value1", url.getParameter("Test1"));
+ Assertions.assertEquals(consumerUrl.getVersion(), url.getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(), url.getGroup());
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ address = new Address(URL.valueOf("127.0.0.1:12345?Test1=Value1"));
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value")
+ .addParameter("group", "Dubbo")
+ .setScopeModel(applicationModel).setServiceModel(serviceModel);
+ url =
defaultUserSpecifiedServiceAddressBuilder.rebuildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(12345, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals("Value1", url.getParameter("Test1"));
+ Assertions.assertEquals(consumerUrl.getVersion(), url.getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(), url.getGroup());
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ address = new Address(URL.valueOf("127.0.0.1:12345?Test1=Value1"));
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value")
+ .setScopeModel(applicationModel).setServiceModel(serviceModel);
+ url =
defaultUserSpecifiedServiceAddressBuilder.rebuildAddress(Collections.emptyList(),
address, Mockito.mock(Invocation.class), consumerUrl);
+ Assertions.assertEquals("127.0.0.1", url.getHost());
+ Assertions.assertEquals(12345, url.getPort());
+ Assertions.assertEquals("Value", url.getParameter("Test"));
+ Assertions.assertEquals("Value1", url.getParameter("Test1"));
+ Assertions.assertEquals(consumerUrl.getVersion(), url.getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(), url.getGroup());
+ Assertions.assertEquals(serviceModel, url.getServiceModel());
+ Assertions.assertEquals(applicationModel, url.getScopeModel());
+
+ applicationModel.destroy();
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DemoService.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DemoService.java
new file mode 100644
index 0000000..a315edb
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/DemoService.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+public interface DemoService {
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCacheTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCacheTest.java
new file mode 100644
index 0000000..47fc38d
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/InvokerCacheTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.rpc.Invoker;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class InvokerCacheTest {
+ @Test
+ public void test() throws InterruptedException {
+ InvokerCache<Object> cache = new
InvokerCache<>(Mockito.mock(Invoker.class));
+ long originTime = cache.getLastAccess();
+ Thread.sleep(5);
+ cache.getInvoker();
+ Assertions.assertNotEquals(originTime, cache.getLastAccess());
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactoryTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactoryTest.java
new file mode 100644
index 0000000..5276f74
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterFactoryTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.cluster.router.state.StateRouterFactory;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class UserSpecifiedAddressRouterFactoryTest {
+ @Test
+ public void test() {
+ ApplicationModel applicationModel = ApplicationModel.defaultModel();
+
+ StateRouterFactory stateRouterFactory =
applicationModel.getExtensionLoader(StateRouterFactory.class).getExtension("user-specified-address");
+ Assertions.assertEquals(UserSpecifiedAddressRouterFactory.class,
stateRouterFactory.getClass());
+
+ stateRouterFactory.getRouter(Object.class,
URL.valueOf("").setScopeModel(applicationModel.newModule()));
+
+ applicationModel.destroy();
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
new file mode 100644
index 0000000..4993e64
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressRouterTest.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.router.state.BitList;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ServiceModel;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+public class UserSpecifiedAddressRouterTest {
+ private ApplicationModel applicationModel;
+ private URL consumerUrl;
+
+ @BeforeEach
+ public void setup() {
+ applicationModel = ApplicationModel.defaultModel();
+ ServiceModel serviceModel = Mockito.mock(ServiceModel.class);
+
Mockito.when(serviceModel.getServiceInterfaceClass()).thenReturn((Class)DemoService.class);
+ consumerUrl = URL.valueOf("127.0.0.2:20880").addParameter("Test",
"Value").addParameter("check", "false")
+ .addParameter("version", "1.0.0").addParameter("group", "Dubbo")
+
.setScopeModel(applicationModel.newModule()).setServiceModel(serviceModel);
+ }
+
+ @AfterEach
+ public void teardown() {
+ applicationModel.destroy();
+ }
+
+ @Test
+ public void test() {
+ Assertions.assertTrue(new
UserSpecifiedAddressRouter<>(URL.valueOf("").setScopeModel(applicationModel.newModule())).supportContinueRoute());
+ }
+
+ @Test
+ public void testNotify() {
+ UserSpecifiedAddressRouter<Object> userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter<>(consumerUrl);
+ Assertions.assertEquals(BitList.emptyList(),
userSpecifiedAddressRouter.getInvokers());
+ Assertions.assertNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNull(userSpecifiedAddressRouter.getIp2Invoker());
+ userSpecifiedAddressRouter.notify(BitList.emptyList());
+ Assertions.assertEquals(BitList.emptyList(),
userSpecifiedAddressRouter.getInvokers());
+ Assertions.assertNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNull(userSpecifiedAddressRouter.getIp2Invoker());
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.1", 0));
+
+ // no address
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null));
+
+
Assertions.assertNotNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNotNull(userSpecifiedAddressRouter.getIp2Invoker());
+
+ userSpecifiedAddressRouter.notify(BitList.emptyList());
+ Assertions.assertEquals(BitList.emptyList(),
userSpecifiedAddressRouter.getInvokers());
+
Assertions.assertNotNull(userSpecifiedAddressRouter.getAddress2Invoker());
+ Assertions.assertNotNull(userSpecifiedAddressRouter.getIp2Invoker());
+ }
+
+ @Test
+ public void testGetInvokerByURL() {
+ UserSpecifiedAddressRouter<Object> userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter<>(consumerUrl);
+
+ Assertions.assertEquals(BitList.emptyList(),
+ userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null));
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880")));
+ BitList<Invoker<Object>> invokers =
userSpecifiedAddressRouter.doRoute(BitList.emptyList(), consumerUrl,
Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+
+ Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
+
Mockito.when(mockInvoker.getUrl()).thenReturn(URL.valueOf("simple://127.0.0.1:20880?Test1=Value"));
+
+ userSpecifiedAddressRouter.notify(new
BitList<>(Collections.singletonList(mockInvoker)));
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880")));
+ invokers = userSpecifiedAddressRouter.doRoute(new
BitList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ userSpecifiedAddressRouter.notify(new
BitList<>(Collections.singletonList(mockInvoker)));
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880?Test1=Value")));
+ invokers = userSpecifiedAddressRouter.doRoute(new
BitList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ userSpecifiedAddressRouter.notify(new
BitList<>(Collections.singletonList(mockInvoker)));
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("simple://127.0.0.1:20880")));
+ invokers = userSpecifiedAddressRouter.doRoute(new
BitList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("dubbo://127.0.0.1:20880")));
+ invokers = userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20770")));
+ invokers = userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20770, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20770?Test1=Value1")));
+ invokers = userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20770, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value1",
invokers.get(0).getUrl().getParameter("Test1"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.2:20770?Test1=Value1")));
+ invokers = userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.2",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20770, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value1",
invokers.get(0).getUrl().getParameter("Test1"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880?Test1=Value&Test2=Value&Test3=Value")));
+ invokers = userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test1"));
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test2"));
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test3"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+ }
+
+ @Test
+ public void testGetInvokerByIp() {
+ UserSpecifiedAddressRouter<Object> userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter<>(consumerUrl);
+
+ Assertions.assertEquals(BitList.emptyList(),
+ userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null));
+
+ Invoker<Object> mockInvoker = Mockito.mock(Invoker.class);
+ Mockito.when(mockInvoker.getUrl()).thenReturn(consumerUrl);
+
+ userSpecifiedAddressRouter.notify(new
BitList<>(Collections.singletonList(mockInvoker)));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 0));
+ BitList<Invoker<Object>> invokers =
userSpecifiedAddressRouter.doRoute(new
BitList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20880));
+ invokers = userSpecifiedAddressRouter.doRoute(new
BitList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals(mockInvoker, invokers.get(0));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770));
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.doRoute(new
BitList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class), false, null, null));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.3", 20880));
+ Assertions.assertThrows(RpcException.class, () ->
+ userSpecifiedAddressRouter.doRoute(new
BitList<>(Collections.singletonList(mockInvoker)), consumerUrl,
Mockito.mock(Invocation.class), false, null, null));
+
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.2", 20770,
true));
+ invokers = userSpecifiedAddressRouter.doRoute(BitList.emptyList(),
consumerUrl, Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.2",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20770, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+ }
+
+ @Test
+ public void testRemovalTask() throws InterruptedException {
+ UserSpecifiedAddressRouter.EXPIRE_TIME = 10;
+ UserSpecifiedAddressRouter<Object> userSpecifiedAddressRouter = new
UserSpecifiedAddressRouter<>(consumerUrl);
+
+ UserSpecifiedAddressUtil.setAddress(new
Address(URL.valueOf("127.0.0.1:20880")));
+ BitList<Invoker<Object>> invokers =
userSpecifiedAddressRouter.doRoute(BitList.emptyList(), consumerUrl,
Mockito.mock(Invocation.class), false, null, null);
+ Assertions.assertEquals(1, invokers.size());
+ Assertions.assertEquals("127.0.0.1",
invokers.get(0).getUrl().getHost());
+ Assertions.assertEquals(20880, invokers.get(0).getUrl().getPort());
+ Assertions.assertEquals("Value",
invokers.get(0).getUrl().getParameter("Test"));
+ Assertions.assertEquals(consumerUrl.getVersion(),
invokers.get(0).getUrl().getVersion());
+ Assertions.assertEquals(consumerUrl.getGroup(),
invokers.get(0).getUrl().getGroup());
+
+ Assertions.assertEquals(1,
userSpecifiedAddressRouter.getNewInvokerCache().size());
+ Thread.sleep(50);
+ Assertions.assertEquals(0,
userSpecifiedAddressRouter.getNewInvokerCache().size());
+
+ userSpecifiedAddressRouter.stop();
+ UserSpecifiedAddressRouter.EXPIRE_TIME = 10 * 60 * 1000;
+
+ }
+}
diff --git
a/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtilTest.java
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtilTest.java
new file mode 100644
index 0000000..0dc1c16
--- /dev/null
+++
b/dubbo-cluster-extensions/dubbo-cluster-specify-address-dubbo3/src/test/java/org/apache/dubbo/rpc/cluster/specifyaddress/UserSpecifiedAddressUtilTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.cluster.specifyaddress;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class UserSpecifiedAddressUtilTest {
+ @Test
+ public void test() {
+ Assertions.assertNull(UserSpecifiedAddressUtil.getAddress());
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.1", 0));
+ Assertions.assertEquals(new Address("127.0.0.1", 0),
UserSpecifiedAddressUtil.getAddress());
+ Assertions.assertNull(UserSpecifiedAddressUtil.getAddress());
+ UserSpecifiedAddressUtil.setAddress(new Address("127.0.0.1", 12345));
+ Assertions.assertNotEquals(new Address("127.0.0.1", 0),
UserSpecifiedAddressUtil.getAddress());
+ Assertions.assertNull(UserSpecifiedAddressUtil.getAddress());
+ }
+}
diff --git a/dubbo-cluster-extensions/pom.xml b/dubbo-cluster-extensions/pom.xml
index 4bfd100..0962a4c 100644
--- a/dubbo-cluster-extensions/pom.xml
+++ b/dubbo-cluster-extensions/pom.xml
@@ -36,6 +36,8 @@
<modules>
<module>dubbo-cluster-broadcast-1</module>
<module>dubbo-cluster-loadbalance-peakewma</module>
+ <module>dubbo-cluster-specify-address-dubbo3</module>
+ <module>dubbo-cluster-specify-address-dubbo2</module>
</modules>