This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cea3d4e0106 Add more test cases on PipelineExecuteEngine (#33311)
cea3d4e0106 is described below
commit cea3d4e0106abfb29d08152e2082ceba12b768cf
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Oct 19 00:45:48 2024 +0800
Add more test cases on PipelineExecuteEngine (#33311)
---
.../core/execute/PipelineExecuteEngine.java | 29 +++++++++++-----------
.../core/execute/PipelineExecuteEngineTest.java | 25 +++++++++----------
2 files changed, 26 insertions(+), 28 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
index 7a83eb87089..00d41b94895 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
@@ -69,6 +69,16 @@ public final class PipelineExecuteEngine {
return new
PipelineExecuteEngine(Executors.newFixedThreadPool(threadNumber,
ExecutorThreadFactoryBuilder.build(threadNameFormat)));
}
+ /**
+ * Submit a {@code LifecycleExecutor} to execute.
+ *
+ * @param pipelineLifecycleRunnable lifecycle executor
+ * @return execute future
+ */
+ public CompletableFuture<?> submit(final PipelineLifecycleRunnable
pipelineLifecycleRunnable) {
+ return CompletableFuture.runAsync(pipelineLifecycleRunnable,
executorService);
+ }
+
/**
* Submit a {@code LifecycleExecutor} with callback {@code
ExecuteCallback} to execute.
*
@@ -82,30 +92,19 @@ public final class PipelineExecuteEngine {
executeCallback.onSuccess();
} else {
Throwable cause = throwable.getCause();
- executeCallback.onFailure(null != cause ? cause : throwable);
+ executeCallback.onFailure(null == cause ? throwable : cause);
}
}, CALLBACK_EXECUTOR);
}
- /**
- * Submit a {@code LifecycleExecutor} to execute.
- *
- * @param pipelineLifecycleRunnable lifecycle executor
- * @return execute future
- */
- public CompletableFuture<?> submit(final PipelineLifecycleRunnable
pipelineLifecycleRunnable) {
- return CompletableFuture.runAsync(pipelineLifecycleRunnable,
executorService);
- }
-
/**
* Shutdown.
*/
public void shutdown() {
- if (executorService.isShutdown()) {
- return;
+ if (!executorService.isShutdown()) {
+ executorService.shutdown();
+ executorService.shutdownNow();
}
- executorService.shutdown();
- executorService.shutdownNow();
}
/**
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
index 5050c1a0244..97d5b53aad8 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
@@ -18,18 +18,14 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
-import org.mockito.internal.configuration.plugins.Plugins;
import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.is;
@@ -42,6 +38,16 @@ import static org.mockito.Mockito.verify;
class PipelineExecuteEngineTest {
+ @Test
+ void assertSubmitWithoutExecuteCallback() {
+ PipelineLifecycleRunnable pipelineLifecycleRunnable =
mock(PipelineLifecycleRunnable.class);
+ PipelineExecuteEngine executeEngine =
PipelineExecuteEngine.newFixedThreadInstance(1,
PipelineExecuteEngineTest.class.getSimpleName());
+ Future<?> future = executeEngine.submit(pipelineLifecycleRunnable);
+ assertTimeout(Duration.ofSeconds(30L), () -> future.get());
+ verify(pipelineLifecycleRunnable).run();
+ executeEngine.shutdown();
+ }
+
@Test
void assertSubmitAndTaskSucceeded() {
PipelineLifecycleRunnable pipelineLifecycleRunnable =
mock(PipelineLifecycleRunnable.class);
@@ -49,9 +55,9 @@ class PipelineExecuteEngineTest {
PipelineExecuteEngine executeEngine =
PipelineExecuteEngine.newCachedThreadInstance(PipelineExecuteEngineTest.class.getSimpleName());
Future<?> future = executeEngine.submit(pipelineLifecycleRunnable,
callback);
assertTimeout(Duration.ofSeconds(30L), () -> future.get());
- shutdownAndAwaitTerminal(executeEngine);
verify(pipelineLifecycleRunnable).run();
verify(callback).onSuccess();
+ executeEngine.shutdown();
}
@Test
@@ -65,8 +71,8 @@ class PipelineExecuteEngineTest {
Optional<Throwable> actualCause =
assertTimeout(Duration.ofSeconds(30L), () -> execute(future));
assertTrue(actualCause.isPresent());
assertThat(actualCause.get(), is(expectedException));
- shutdownAndAwaitTerminal(executeEngine);
verify(callback).onFailure(expectedException);
+ executeEngine.shutdown();
}
private Optional<Throwable> execute(final Future<?> future) throws
InterruptedException {
@@ -78,13 +84,6 @@ class PipelineExecuteEngineTest {
}
}
- @SneakyThrows({ReflectiveOperationException.class,
InterruptedException.class})
- private void shutdownAndAwaitTerminal(final PipelineExecuteEngine
executeEngine) {
- ExecutorService executorService = (ExecutorService)
Plugins.getMemberAccessor().get(PipelineExecuteEngine.class.getDeclaredField("executorService"),
executeEngine);
- executorService.shutdown();
- executorService.awaitTermination(30L, TimeUnit.SECONDS);
- }
-
@Test
void assertTriggerAllSuccess() {
CompletableFuture<?> future1 = CompletableFuture.runAsync(new
FixtureRunnable(true));