This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new ee6db0a9af fix executor datasource loss (#10685)
ee6db0a9af is described below

commit ee6db0a9af28d7515c3b5055d185483d7fd4c107
Author: huazhongming <[email protected]>
AuthorDate: Fri Sep 30 14:21:59 2022 +0800

    fix executor datasource loss (#10685)
    
    Signed-off-by: crazyhzm <[email protected]>
    
    Signed-off-by: crazyhzm <[email protected]>
---
 .../org/apache/dubbo/common/constants/CommonConstants.java |  2 ++
 .../threadpool/manager/DefaultExecutorRepository.java      | 14 +++++++++++++-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index bd4e256528..aad6f49ca0 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -112,6 +112,8 @@ public interface CommonConstants {
 
     String EXECUTOR_SERVICE_COMPONENT_KEY = ExecutorService.class.getName();
 
+    String CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY = 
"CONSUMER_SHARED_SERVICE_EXECUTOR";
+
     String INTERNAL_EXECUTOR_SERVICE_COMPONENT_KEY = 
"INTERNAL_SERVICE_EXECUTOR";
 
     String THREADPOOL_KEY = "threadpool";
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index 223ad944a8..c337acaf3e 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -21,6 +21,7 @@ import org.apache.dubbo.common.extension.ExtensionAccessor;
 import org.apache.dubbo.common.extension.ExtensionAccessorAware;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.store.DataStore;
 import org.apache.dubbo.common.threadpool.ThreadPool;
 import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
@@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import static 
org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_EXPORT_THREAD_NUM;
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_PROTOCOL;
@@ -68,9 +70,12 @@ public class DefaultExecutorRepository implements 
ExecutorRepository, ExtensionA
     private final ApplicationModel applicationModel;
     private final FrameworkExecutorRepository frameworkExecutorRepository;
 
+    private final DataStore dataStore;
+
     public DefaultExecutorRepository(ApplicationModel applicationModel) {
         this.applicationModel = applicationModel;
         this.frameworkExecutorRepository = 
applicationModel.getFrameworkModel().getBeanFactory().getBean(FrameworkExecutorRepository.class);
+        this.dataStore = 
applicationModel.getExtensionLoader(DataStore.class).getDefaultExtension();
     }
 
     /**
@@ -81,7 +86,8 @@ public class DefaultExecutorRepository implements 
ExecutorRepository, ExtensionA
      */
     @Override
     public synchronized ExecutorService createExecutorIfAbsent(URL url) {
-        Map<Integer, ExecutorService> executors = 
data.computeIfAbsent(getExecutorKey(url), k -> new ConcurrentHashMap<>());
+        String executorKey = getExecutorKey(url);
+        Map<Integer, ExecutorService> executors = 
data.computeIfAbsent(executorKey, k -> new ConcurrentHashMap<>());
         // Consumer's executor is sharing globally, key=Integer.MAX_VALUE. 
Provider's executor is sharing by protocol.
         Integer portKey = 
CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? Integer.MAX_VALUE 
: url.getPort();
 
@@ -101,6 +107,8 @@ public class DefaultExecutorRepository implements 
ExecutorRepository, ExtensionA
             executor = createExecutor(url);
             executors.put(portKey, executor);
         }
+
+        dataStore.put(executorKey, Integer.toString(portKey), executor);
         return executor;
     }
 
@@ -118,6 +126,10 @@ public class DefaultExecutorRepository implements 
ExecutorRepository, ExtensionA
             executorKey = EXECUTOR_SERVICE_COMPONENT_KEY;
 
         }
+
+        if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))){
+            executorKey = CONSUMER_SHARED_EXECUTOR_SERVICE_COMPONENT_KEY;
+        }
         return executorKey;
     }
 

Reply via email to