This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cea3d4e0106 Add more test cases on PipelineExecuteEngine (#33311)
cea3d4e0106 is described below

commit cea3d4e0106abfb29d08152e2082ceba12b768cf
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Oct 19 00:45:48 2024 +0800

    Add more test cases on PipelineExecuteEngine (#33311)
---
 .../core/execute/PipelineExecuteEngine.java        | 29 +++++++++++-----------
 .../core/execute/PipelineExecuteEngineTest.java    | 25 +++++++++----------
 2 files changed, 26 insertions(+), 28 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
index 7a83eb87089..00d41b94895 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngine.java
@@ -69,6 +69,16 @@ public final class PipelineExecuteEngine {
         return new 
PipelineExecuteEngine(Executors.newFixedThreadPool(threadNumber, 
ExecutorThreadFactoryBuilder.build(threadNameFormat)));
     }
     
+    /**
+     * Submit a {@code LifecycleExecutor} to execute.
+     *
+     * @param pipelineLifecycleRunnable lifecycle executor
+     * @return execute future
+     */
+    public CompletableFuture<?> submit(final PipelineLifecycleRunnable 
pipelineLifecycleRunnable) {
+        return CompletableFuture.runAsync(pipelineLifecycleRunnable, 
executorService);
+    }
+    
     /**
      * Submit a {@code LifecycleExecutor} with callback {@code 
ExecuteCallback} to execute.
      *
@@ -82,30 +92,19 @@ public final class PipelineExecuteEngine {
                 executeCallback.onSuccess();
             } else {
                 Throwable cause = throwable.getCause();
-                executeCallback.onFailure(null != cause ? cause : throwable);
+                executeCallback.onFailure(null == cause ? throwable : cause);
             }
         }, CALLBACK_EXECUTOR);
     }
     
-    /**
-     * Submit a {@code LifecycleExecutor} to execute.
-     *
-     * @param pipelineLifecycleRunnable lifecycle executor
-     * @return execute future
-     */
-    public CompletableFuture<?> submit(final PipelineLifecycleRunnable 
pipelineLifecycleRunnable) {
-        return CompletableFuture.runAsync(pipelineLifecycleRunnable, 
executorService);
-    }
-    
     /**
      * Shutdown.
      */
     public void shutdown() {
-        if (executorService.isShutdown()) {
-            return;
+        if (!executorService.isShutdown()) {
+            executorService.shutdown();
+            executorService.shutdownNow();
         }
-        executorService.shutdown();
-        executorService.shutdownNow();
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
index 5050c1a0244..97d5b53aad8 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineExecuteEngineTest.java
@@ -18,18 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.RequiredArgsConstructor;
-import lombok.SneakyThrows;
 import org.junit.jupiter.api.Test;
-import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -42,6 +38,16 @@ import static org.mockito.Mockito.verify;
 
 class PipelineExecuteEngineTest {
     
+    @Test
+    void assertSubmitWithoutExecuteCallback() {
+        PipelineLifecycleRunnable pipelineLifecycleRunnable = 
mock(PipelineLifecycleRunnable.class);
+        PipelineExecuteEngine executeEngine = 
PipelineExecuteEngine.newFixedThreadInstance(1, 
PipelineExecuteEngineTest.class.getSimpleName());
+        Future<?> future = executeEngine.submit(pipelineLifecycleRunnable);
+        assertTimeout(Duration.ofSeconds(30L), () -> future.get());
+        verify(pipelineLifecycleRunnable).run();
+        executeEngine.shutdown();
+    }
+    
     @Test
     void assertSubmitAndTaskSucceeded() {
         PipelineLifecycleRunnable pipelineLifecycleRunnable = 
mock(PipelineLifecycleRunnable.class);
@@ -49,9 +55,9 @@ class PipelineExecuteEngineTest {
         PipelineExecuteEngine executeEngine = 
PipelineExecuteEngine.newCachedThreadInstance(PipelineExecuteEngineTest.class.getSimpleName());
         Future<?> future = executeEngine.submit(pipelineLifecycleRunnable, 
callback);
         assertTimeout(Duration.ofSeconds(30L), () -> future.get());
-        shutdownAndAwaitTerminal(executeEngine);
         verify(pipelineLifecycleRunnable).run();
         verify(callback).onSuccess();
+        executeEngine.shutdown();
     }
     
     @Test
@@ -65,8 +71,8 @@ class PipelineExecuteEngineTest {
         Optional<Throwable> actualCause = 
assertTimeout(Duration.ofSeconds(30L), () -> execute(future));
         assertTrue(actualCause.isPresent());
         assertThat(actualCause.get(), is(expectedException));
-        shutdownAndAwaitTerminal(executeEngine);
         verify(callback).onFailure(expectedException);
+        executeEngine.shutdown();
     }
     
     private Optional<Throwable> execute(final Future<?> future) throws 
InterruptedException {
@@ -78,13 +84,6 @@ class PipelineExecuteEngineTest {
         }
     }
     
-    @SneakyThrows({ReflectiveOperationException.class, 
InterruptedException.class})
-    private void shutdownAndAwaitTerminal(final PipelineExecuteEngine 
executeEngine) {
-        ExecutorService executorService = (ExecutorService) 
Plugins.getMemberAccessor().get(PipelineExecuteEngine.class.getDeclaredField("executorService"),
 executeEngine);
-        executorService.shutdown();
-        executorService.awaitTermination(30L, TimeUnit.SECONDS);
-    }
-    
     @Test
     void assertTriggerAllSuccess() {
         CompletableFuture<?> future1 = CompletableFuture.runAsync(new 
FixtureRunnable(true));

Reply via email to