This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new ee6db0a9af fix executor datasource loss (#10685)
ee6db0a9af is described below
commit ee6db0a9af28d7515c3b5055d185483d7fd4c107
Author: huazhongming <[email protected]>
AuthorDate: Fri Sep 30 14:21:59 2022 +0800
fix executor datasource loss (#10685)
Signed-off-by: crazyhzm <[email protected]>
Signed-off-by: crazyhzm <[email protected]>
---
.../org/apache/dubbo/common/constants/CommonConstants.java | 2 ++
.../threadpool/manager/DefaultExecutorRepository.java | 14 +++++++++++++-
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index bd4e256528..aad6f49ca0 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -112,6 +112,8 @@ public interface CommonConstants {
String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
+ String CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY =
"CONSUMER_SHARED_SERVICE_EXECUTOR";
+
String INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY =
"INTERNAL_SERVICE_EXECUTOR";
String THREADPOOL_KEY = "threadpool";
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index 223ad944a8..c337acaf3e 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.extension.ExtensionAccessor;
import org.apache.dubbo.common.extension.ExtensionAccessorAware;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.store.DataStore;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import static
org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_EXPORT_THREAD_NUM;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL;
@@ -68,9 +70,12 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
private final ApplicationModel applicationModel;
private final FrameworkExecutorRepository frameworkExecutorRepository;
+ private final DataStore dataStore;
+
public DefaultExecutorRepository(ApplicationModel applicationModel) {
this.applicationModel = applicationModel;
this.frameworkExecutorRepository =
applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class);
+ this.dataStore =
applicationModel.getExtensionLoader(DataStore.class).getDefaultExtension();
}
/**
@@ -81,7 +86,8 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
*/
@Override
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
- Map<Integer, ExecutorService> executors =
data.computeIfAbsent(getExecutorKey(url), k -> new ConcurrentHashMap<>());
+ String executorKey = getExecutorKey(url);
+ Map<Integer, ExecutorService> executors =
data.computeIfAbsent(executorKey, k -> new ConcurrentHashMap<>());
// Consumer's executor is sharing globally, key=Integer.MAX_VALUE.
Provider's executor is sharing by protocol.
Integer portKey =
CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE
: url.getPort();
@@ -101,6 +107,8 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
executor = createExecutor(url);
executors.put(portKey, executor);
}
+
+ dataStore.put(executorKey, Integer.toString(portKey), executor);
return executor;
}
@@ -118,6 +126,10 @@ public class DefaultExecutorRepository implements
ExecutorRepository, ExtensionA
executorKey = EXECUTOR_SERVICE_COMPONENT_KEY;
}
+
+ if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))){
+ executorKey = CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
+ }
return executorKey;
}