This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ea7e071 Close useless executor if put not success (#9485)
ea7e071 is described below
commit ea7e071f90f7e2a3024256ee93000237e6e0d58a
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Feb 24 15:10:49 2021 +0800
Close useless executor if put not success (#9485)
* Rename ScenarioExecutorService
* Close useless executor if put not success
---
.../parallel/ParallelRunnerExecutorFactory.java | 9 ++++--
.../impl/ScenarioParallelRunnerExecutor.java | 35 +++++++++++-----------
2 files changed, 24 insertions(+), 20 deletions(-)
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
index 479de64..6d6a234 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/ParallelRunnerExecutorFactory.java
@@ -23,15 +23,15 @@ import
org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.Cas
import
org.apache.shardingsphere.test.integration.engine.junit.parallel.impl.ScenarioParallelRunnerExecutor;
import java.util.Collection;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
/**
* Parallel runner executor factory.
*/
public final class ParallelRunnerExecutorFactory {
- private final ConcurrentMap<DatabaseType, ParallelRunnerExecutor>
executors = new ConcurrentHashMap<>();
+ private final Map<DatabaseType, ParallelRunnerExecutor> executors = new
ConcurrentHashMap<>();
/**
* Get parallel runner executor.
@@ -44,7 +44,10 @@ public final class ParallelRunnerExecutorFactory {
if (executors.containsKey(databaseType)) {
return executors.get(databaseType);
}
- executors.putIfAbsent(databaseType, newInstance(parallelLevel));
+ ParallelRunnerExecutor newExecutor = newInstance(parallelLevel);
+ if (null != executors.putIfAbsent(databaseType, newExecutor)) {
+ newExecutor.finished();
+ }
return executors.get(databaseType);
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
index a906af9..0541f35 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-suite/src/test/java/org/apache/shardingsphere/test/integration/engine/junit/parallel/impl/ScenarioParallelRunnerExecutor.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.test.integration.engine.junit.parallel.impl;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.EqualsAndHashCode;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.test.integration.engine.junit.parallel.ParallelRunnerExecutor;
import
org.apache.shardingsphere.test.integration.engine.param.model.ParameterizedArray;
@@ -27,7 +26,6 @@ import
org.apache.shardingsphere.test.integration.engine.param.model.Parameteriz
import java.io.Closeable;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -45,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
public final class ScenarioParallelRunnerExecutor implements
ParallelRunnerExecutor {
- private final ConcurrentMap<ScenarioKey, ScenarioExecutorQueue>
scenarioExecutorQueues = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ScenarioKey, ScenarioExecutorService>
scenarioExecutorServices = new ConcurrentHashMap<>();
private final Lock lock = new ReentrantLock();
@@ -56,19 +54,19 @@ public final class ScenarioParallelRunnerExecutor
implements ParallelRunnerExecu
scenarioTaskResults.add(getScenarioExecutorQueue(new
ScenarioKey(parameterizedArray)).submit(childStatement));
}
- private ScenarioExecutorQueue getScenarioExecutorQueue(final ScenarioKey
scenarioKey) {
- ScenarioExecutorQueue scenarioExecutorQueue =
scenarioExecutorQueues.get(scenarioKey);
+ private ScenarioExecutorService getScenarioExecutorQueue(final ScenarioKey
scenarioKey) {
+ ScenarioExecutorService scenarioExecutorQueue =
scenarioExecutorServices.get(scenarioKey);
if (null != scenarioExecutorQueue) {
return scenarioExecutorQueue;
}
try {
lock.lock();
- scenarioExecutorQueue = scenarioExecutorQueues.get(scenarioKey);
+ scenarioExecutorQueue = scenarioExecutorServices.get(scenarioKey);
if (null != scenarioExecutorQueue) {
return scenarioExecutorQueue;
}
- scenarioExecutorQueue = new ScenarioExecutorQueue(scenarioKey);
- scenarioExecutorQueues.put(scenarioKey, scenarioExecutorQueue);
+ scenarioExecutorQueue = new ScenarioExecutorService(scenarioKey);
+ scenarioExecutorServices.put(scenarioKey, scenarioExecutorQueue);
return scenarioExecutorQueue;
} finally {
lock.unlock();
@@ -83,7 +81,7 @@ public final class ScenarioParallelRunnerExecutor implements
ParallelRunnerExecu
} catch (final InterruptedException | ExecutionException ignored) {
}
});
- scenarioExecutorQueues.values().forEach(ScenarioExecutorQueue::close);
+
scenarioExecutorServices.values().forEach(ScenarioExecutorService::close);
}
/**
@@ -111,20 +109,23 @@ public final class ScenarioParallelRunnerExecutor
implements ParallelRunnerExecu
}
/**
- * Scenario executor queue.
+ * Scenario executor service.
*/
- @Setter
- private static final class ScenarioExecutorQueue implements Closeable {
-
- private final BlockingQueue<Runnable> executorQueue;
+ private static final class ScenarioExecutorService implements Closeable {
private final ExecutorService executorService;
- ScenarioExecutorQueue(final ScenarioKey scenarioKey) {
- executorQueue = new LinkedBlockingQueue<>();
- executorService = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, executorQueue, new
ThreadFactoryBuilder().setNameFormat("ScenarioExecutor-" + scenarioKey +
"-pool-%d").build());
+ ScenarioExecutorService(final ScenarioKey scenarioKey) {
+ String threadPoolNameFormat = String.join("-",
"ScenarioExecutorPool", scenarioKey.toString(), "%d");
+ executorService = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new
ThreadFactoryBuilder().setNameFormat(threadPoolNameFormat).build());
}
+ /**
+ * Submit task.
+ *
+ * @param childStatement child statement
+ * @return task future
+ */
public Future<?> submit(final Runnable childStatement) {
return executorService.submit(childStatement);
}