This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 17b75a7895 Refresh valid invokers after connectivity check (#13773)
17b75a7895 is described below
commit 17b75a78955effbf82f4cd958c37545209c152e1
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Feb 29 11:26:11 2024 +0800
Refresh valid invokers after connectivity check (#13773)
---
.../rpc/cluster/directory/AbstractDirectory.java | 167 +++++++++++++--------
.../org/apache/dubbo/common/utils/LockUtils.java | 54 +++++++
.../apache/dubbo/common/utils/LockUtilsTest.java | 144 ++++++++++++++++++
3 files changed, 301 insertions(+), 64 deletions(-)
diff --git
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index 087eacbade..be68810f4a 100644
---
a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++
b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -25,6 +25,7 @@ import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
+import org.apache.dubbo.common.utils.LockUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metrics.event.MetricsEventBus;
@@ -56,6 +57,8 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
@@ -122,6 +125,8 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
private volatile ScheduledFuture<?> connectivityCheckFuture;
+ private final ReentrantLock invokerRefreshLock = new ReentrantLock();
+
/**
* The max count of invokers for each reconnect task select to try to
reconnect.
*/
@@ -293,17 +298,19 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
@Override
public void addInvalidateInvoker(Invoker<T> invoker) {
- // 1. remove this invoker from validInvokers list, this invoker will
not be listed in the next time
- if (removeValidInvoker(invoker)) {
- // 2. add this invoker to reconnect list
- invokersToReconnect.add(invoker);
- // 3. try start check connectivity task
- checkConnectivity();
-
- logger.info("The invoker " + invoker.getUrl()
- + " has been added to invalidate list due to connectivity
problem. "
- + "Will trying to reconnect to it in the background.");
- }
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ // 1. remove this invoker from validInvokers list, this invoker
will not be listed in the next time
+ if (removeValidInvoker(invoker)) {
+ // 2. add this invoker to reconnect list
+ invokersToReconnect.add(invoker);
+ // 3. try start check connectivity task
+ checkConnectivity();
+
+ logger.info("The invoker " + invoker.getUrl()
+ + " has been added to invalidate list due to
connectivity problem. "
+ + "Will trying to reconnect to it in the background.");
+ }
+ });
}
public void checkConnectivity() {
@@ -322,23 +329,30 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
// 1. pick invokers from invokersToReconnect
// limit max reconnectTaskTryCount, prevent this
task hang up all the connectivityExecutor
// for long time
- if (invokersToReconnect.size() <
reconnectTaskTryCount) {
- invokersToTry.addAll(invokersToReconnect);
- } else {
- for (int i = 0; i < reconnectTaskTryCount;
i++) {
- Invoker<T> tInvoker =
invokersToReconnect.get(
-
ThreadLocalRandom.current().nextInt(invokersToReconnect.size()));
- if (!invokersToTry.contains(tInvoker)) {
- // ignore if is selected,
invokersToTry's size is always smaller than
- // reconnectTaskTryCount + 1
- invokersToTry.add(tInvoker);
+ LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+ if (invokersToReconnect.size() <
reconnectTaskTryCount) {
+ invokersToTry.addAll(invokersToReconnect);
+ } else {
+ for (int i = 0; i < reconnectTaskTryCount;
i++) {
+ Invoker<T> tInvoker =
invokersToReconnect.get(
+
ThreadLocalRandom.current().nextInt(invokersToReconnect.size()));
+ if (!invokersToTry.contains(tInvoker))
{
+ // ignore if is selected,
invokersToTry's size is always smaller than
+ // reconnectTaskTryCount + 1
+ invokersToTry.add(tInvoker);
+ }
}
}
- }
+ });
// 2. try to check the invoker's status
for (Invoker<T> invoker : invokersToTry) {
- if (invokers.contains(invoker)) {
+ AtomicBoolean invokerExist = new
AtomicBoolean(false);
+ LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+
invokerExist.set(invokers.contains(invoker));
+ });
+ // Should not lock here, `invoker.isAvailable`
may need some time to check
+ if (invokerExist.get()) {
if (invoker.isAvailable()) {
needDeleteList.add(invoker);
}
@@ -348,22 +362,37 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
}
// 3. recover valid invoker
- for (Invoker<T> tInvoker : needDeleteList) {
- if (invokers.contains(tInvoker)) {
- addValidInvoker(tInvoker);
- logger.info(
- "Recover service address: " +
tInvoker.getUrl() + " from invalid list.");
+ LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+ for (Invoker<T> tInvoker : needDeleteList) {
+ if (invokers.contains(tInvoker)) {
+ addValidInvoker(tInvoker);
+ logger.info("Recover service address:
" + tInvoker.getUrl()
+ + " from invalid list.");
+ } else {
+ logger.info(
+ "The invoker " +
tInvoker.getUrl()
+ + " has been removed
from invokers list. Will remove it in reconnect list.");
+ }
+ invokersToReconnect.remove(tInvoker);
}
- invokersToReconnect.remove(tInvoker);
- }
+ });
+ } catch (Throwable t) {
+ logger.error(
+ LoggerCodeConstants.INTERNAL_ERROR,
+ "",
+ "",
+ "Error occurred when check connectivity. ",
+ t);
} finally {
checkConnectivityPermit.release();
}
// 4. submit new task if it has more to recover
- if (!invokersToReconnect.isEmpty()) {
- checkConnectivity();
- }
+ LockUtils.safeLock(invokerRefreshLock,
LockUtils.DEFAULT_TIMEOUT, () -> {
+ if (!invokersToReconnect.isEmpty()) {
+ checkConnectivity();
+ }
+ });
MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(
applicationModel, getSummary(),
getDirectoryMeta()));
},
@@ -382,9 +411,11 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
* 4. all the invokers disappeared from total invokers should be removed
in the disabled invokers list
*/
public void refreshInvoker() {
- if (invokersInitialized) {
- refreshInvokerInternal();
- }
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ if (invokersInitialized) {
+ refreshInvokerInternal();
+ }
+ });
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary(), getDirectoryMeta()));
}
@@ -393,7 +424,7 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
return Collections.emptyMap();
}
- private synchronized void refreshInvokerInternal() {
+ private void refreshInvokerInternal() {
BitList<Invoker<T>> copiedInvokers = invokers.clone();
refreshInvokers(copiedInvokers, invokersToReconnect);
refreshInvokers(copiedInvokers, disabledInvokers);
@@ -414,25 +445,29 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
@Override
public void addDisabledInvoker(Invoker<T> invoker) {
- if (invokers.contains(invoker)) {
- disabledInvokers.add(invoker);
- removeValidInvoker(invoker);
- logger.info("Disable service address: " + invoker.getUrl() + ".");
- }
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ if (invokers.contains(invoker)) {
+ disabledInvokers.add(invoker);
+ removeValidInvoker(invoker);
+ logger.info("Disable service address: " + invoker.getUrl() +
".");
+ }
+ });
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary(), getDirectoryMeta()));
}
@Override
public void recoverDisabledInvoker(Invoker<T> invoker) {
- if (disabledInvokers.remove(invoker)) {
- try {
- addValidInvoker(invoker);
- logger.info("Recover service address: " + invoker.getUrl() + "
from disabled list.");
- } catch (Throwable ignore) {
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ if (disabledInvokers.remove(invoker)) {
+ try {
+ addValidInvoker(invoker);
+ logger.info("Recover service address: " + invoker.getUrl()
+ " from disabled list.");
+ } catch (Throwable ignore) {
+ }
}
- }
+ });
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary(), getDirectoryMeta()));
}
@@ -491,9 +526,11 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
}
protected void setInvokers(BitList<Invoker<T>> invokers) {
- this.invokers = invokers;
- refreshInvokerInternal();
- this.invokersInitialized = true;
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ this.invokers = invokers;
+ refreshInvokerInternal();
+ this.invokersInitialized = true;
+ });
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary(), getDirectoryMeta()));
@@ -501,29 +538,31 @@ public abstract class AbstractDirectory<T> implements
Directory<T> {
protected void destroyInvokers() {
// set empty instead of clearing to support concurrent access.
- this.invokers = BitList.emptyList();
- this.validInvokers = BitList.emptyList();
- this.invokersInitialized = false;
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ this.invokers = BitList.emptyList();
+ this.validInvokers = BitList.emptyList();
+ this.invokersInitialized = false;
+ });
}
private boolean addValidInvoker(Invoker<T> invoker) {
- boolean result;
- synchronized (this.validInvokers) {
- result = this.validInvokers.add(invoker);
- }
+ AtomicBoolean result = new AtomicBoolean(false);
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ result.set(this.validInvokers.add(invoker));
+ });
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary(), getDirectoryMeta()));
- return result;
+ return result.get();
}
private boolean removeValidInvoker(Invoker<T> invoker) {
- boolean result;
- synchronized (this.validInvokers) {
- result = this.validInvokers.remove(invoker);
- }
+ AtomicBoolean result = new AtomicBoolean(false);
+ LockUtils.safeLock(invokerRefreshLock, LockUtils.DEFAULT_TIMEOUT, ()
-> {
+ result.set(this.validInvokers.remove(invoker));
+ });
MetricsEventBus.publish(
RegistryEvent.refreshDirectoryEvent(applicationModel,
getSummary(), getDirectoryMeta()));
- return result;
+ return result.get();
}
protected abstract List<Invoker<T>> doList(
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java
new file mode 100644
index 0000000000..f441dc9b2f
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/LockUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.utils;
+
+import org.apache.dubbo.common.constants.LoggerCodeConstants;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+
+public class LockUtils {
+ private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(LockUtils.class);
+
+ public static final int DEFAULT_TIMEOUT = 60_000;
+
+ public static void safeLock(Lock lock, int timeout, Runnable runnable) {
+ try {
+ if (!lock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
+ logger.error(
+ LoggerCodeConstants.INTERNAL_ERROR,
+ "",
+ "",
+ "Try to lock failed, timeout: " + timeout,
+ new TimeoutException());
+ }
+ runnable.run();
+ } catch (InterruptedException e) {
+ logger.warn(LoggerCodeConstants.INTERNAL_ERROR, "", "", "Try to
lock failed", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ try {
+ lock.unlock();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ }
+}
diff --git
a/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java
b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java
new file mode 100644
index 0000000000..87c1824930
--- /dev/null
+++
b/dubbo-common/src/test/java/org/apache/dubbo/common/utils/LockUtilsTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.common.utils;
+
+import java.lang.Thread.State;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.RepeatedTest;
+
+import static org.awaitility.Awaitility.await;
+
+public class LockUtilsTest {
+ @RepeatedTest(5)
+ void testLockFailed() {
+ ReentrantLock reentrantLock = new ReentrantLock();
+ AtomicBoolean releaseLock = new AtomicBoolean(false);
+ new Thread(() -> {
+ reentrantLock.lock();
+ while (!releaseLock.get()) {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ reentrantLock.unlock();
+ })
+ .start();
+
+ await().until(reentrantLock::isLocked);
+
+ AtomicLong lockTime = new AtomicLong(0);
+ long startTime = System.currentTimeMillis();
+ LockUtils.safeLock(reentrantLock, 1000, () -> {
+ lockTime.set(System.currentTimeMillis());
+ });
+ Assertions.assertTrue(lockTime.get() - startTime >= 1000);
+ releaseLock.set(true);
+
+ while (reentrantLock.isLocked()) {
+ try {
+ Thread.sleep(5);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ lockTime.set(0);
+ startTime = System.currentTimeMillis();
+ LockUtils.safeLock(reentrantLock, 1000, () -> {
+ lockTime.set(System.currentTimeMillis());
+ });
+ Assertions.assertTrue(lockTime.get() - startTime < 1000);
+ }
+
+ @RepeatedTest(5)
+ void testReentrant() {
+ ReentrantLock reentrantLock = new ReentrantLock();
+ reentrantLock.lock();
+
+ AtomicLong lockTime = new AtomicLong(0);
+ long startTime = System.currentTimeMillis();
+ LockUtils.safeLock(reentrantLock, 1000, () -> {
+ lockTime.set(System.currentTimeMillis());
+ });
+ Assertions.assertTrue(lockTime.get() - startTime < 1000);
+
+ reentrantLock.lock();
+ lockTime.set(0);
+ startTime = System.currentTimeMillis();
+ LockUtils.safeLock(reentrantLock, 1000, () -> {
+ lockTime.set(System.currentTimeMillis());
+ });
+ Assertions.assertTrue(lockTime.get() - startTime < 1000);
+
+ Assertions.assertTrue(reentrantLock.isLocked());
+ reentrantLock.unlock();
+ Assertions.assertTrue(reentrantLock.isLocked());
+ reentrantLock.unlock();
+ Assertions.assertFalse(reentrantLock.isLocked());
+ }
+
+ @RepeatedTest(5)
+ void testInterrupt() {
+ ReentrantLock reentrantLock = new ReentrantLock();
+ reentrantLock.lock();
+
+ AtomicBoolean locked = new AtomicBoolean(false);
+ Thread thread = new Thread(() -> {
+ LockUtils.safeLock(reentrantLock, 10000, () -> {
+ locked.set(true);
+ });
+ });
+ thread.start();
+
+ await().until(() -> thread.getState() == State.TIMED_WAITING);
+ thread.interrupt();
+ await().until(() -> thread.getState() == State.TERMINATED);
+
+ Assertions.assertFalse(locked.get());
+
+ reentrantLock.unlock();
+ }
+
+ @RepeatedTest(5)
+ void testHoldLock() throws InterruptedException {
+ ReentrantLock reentrantLock = new ReentrantLock();
+ reentrantLock.lock();
+
+ AtomicLong lockTime = new AtomicLong(0);
+ long startTime = System.currentTimeMillis();
+ Thread thread = new Thread(() -> {
+ LockUtils.safeLock(reentrantLock, 10000, () -> {
+ lockTime.set(System.currentTimeMillis());
+ });
+ });
+ thread.start();
+
+ await().until(() -> thread.getState() == State.TIMED_WAITING);
+ Thread.sleep(1000);
+ reentrantLock.unlock();
+
+ await().until(() -> thread.getState() == State.TERMINATED);
+ Assertions.assertTrue(lockTime.get() - startTime > 1000);
+ Assertions.assertTrue(lockTime.get() - startTime < 10000);
+ }
+}