This is an automated email from the ASF dual-hosted git repository. linghengqian pushed a commit to branch revert-2475-feat-support-interrupt in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
commit 942750af7640850e6dda183ef5e63bcc211f6dc1 Author: Ling Hengqian <[email protected]> AuthorDate: Thu Sep 11 15:54:33 2025 +0800 Revert "feat: Support business code capture interruption during exit (#2475)" This reverts commit 5f2cea628c917701325919b03031b17607c26bc8. --- .../threadpool/ExecutorServiceReloader.java | 4 +- .../kernel/internal/schedule/JobScheduler.java | 2 - .../kernel/internal/schedule/LiteJob.java | 22 ++--------- .../threadpool/ExecutorServiceReloaderTest.java | 4 +- .../test/natived/it/operation/JavaTest.java | 44 ---------------------- 5 files changed, 7 insertions(+), 69 deletions(-) diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java index 4f377e6a4..25769242b 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloader.java @@ -47,7 +47,7 @@ public final class ExecutorServiceReloader implements Closeable { if (jobExecutorThreadPoolSizeProviderType.equals(jobConfig.getJobExecutorThreadPoolSizeProviderType())) { return; } - executorService.shutdownNow(); + executorService.shutdown(); init(jobConfig); } @@ -59,6 +59,6 @@ public final class ExecutorServiceReloader implements Closeable { @Override public void close() { - executorService.shutdownNow(); + executorService.shutdown(); } } diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java index 859ccd58b..fbeab942e 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/JobScheduler.java @@ -178,7 +178,6 @@ public final class JobScheduler { result.put("org.quartz.jobStore.misfireThreshold", "1"); result.put("org.quartz.plugin.shutdownhook.class", JobShutdownHookPlugin.class.getName()); result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString()); - result.put("org.quartz.scheduler.interruptJobsOnShutdown", Boolean.TRUE.toString()); return result; } @@ -201,7 +200,6 @@ public final class JobScheduler { public void shutdown() { setUpFacade.tearDown(); schedulerFacade.shutdownInstance(); - jobScheduleController.shutdown(false); jobExecutor.shutdown(); } } diff --git a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java index 9b1ec359d..fdf57b99e 100644 --- a/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java +++ b/kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/schedule/LiteJob.java @@ -19,36 +19,20 @@ package org.apache.shardingsphere.elasticjob.kernel.internal.schedule; import lombok.Setter; import org.apache.shardingsphere.elasticjob.kernel.executor.ElasticJobExecutor; -import org.quartz.InterruptableJob; +import org.quartz.Job; import org.quartz.JobExecutionContext; -import org.quartz.UnableToInterruptJobException; - -import java.util.Objects; /** * Lite job. */ @Setter -public final class LiteJob implements InterruptableJob { +public final class LiteJob implements Job { private ElasticJobExecutor jobExecutor; - private volatile Thread currentThread; - @Override public void execute(final JobExecutionContext context) { - try { - currentThread = Thread.currentThread(); - jobExecutor.execute(); - } finally { - currentThread = null; - } + jobExecutor.execute(); } - @Override - public void interrupt() throws UnableToInterruptJobException { - if (Objects.nonNull(currentThread)) { - currentThread.interrupt(); - } - } } diff --git a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java index 1ec2a3ebb..55deeead5 100644 --- a/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java +++ b/kernel/src/test/java/org/apache/shardingsphere/elasticjob/kernel/executor/threadpool/ExecutorServiceReloaderTest.java @@ -57,7 +57,7 @@ class ExecutorServiceReloaderTest { ReflectionUtils.setFieldValue(executorServiceReloader, "executorService", mockExecutorService); JobConfiguration jobConfig = JobConfiguration.newBuilder("job", 1).build(); executorServiceReloader.reloadIfNecessary(jobConfig); - verify(mockExecutorService).shutdownNow(); + verify(mockExecutorService).shutdown(); ExecutorService actual = executorServiceReloader.getExecutorService(); assertFalse(actual.isShutdown()); assertFalse(actual.isTerminated()); @@ -81,6 +81,6 @@ class ExecutorServiceReloaderTest { ExecutorServiceReloader executorServiceReloader = new ExecutorServiceReloader(JobConfiguration.newBuilder("job", 1).jobExecutorThreadPoolSizeProviderType("SINGLE_THREAD").build()); ReflectionUtils.setFieldValue(executorServiceReloader, "executorService", mockExecutorService); executorServiceReloader.close(); - verify(mockExecutorService).shutdownNow(); + verify(mockExecutorService).shutdown(); } } diff --git a/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java b/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java index 7fb5ac529..939866716 100644 --- a/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java +++ b/test/native/src/test/java/org/apache/shardingsphere/elasticjob/test/natived/it/operation/JavaTest.java @@ -44,7 +44,6 @@ import org.apache.shardingsphere.elasticjob.lifecycle.internal.statistics.Shardi import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; -import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.apache.shardingsphere.elasticjob.test.natived.commons.job.simple.JavaSimpleJob; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; @@ -55,13 +54,11 @@ import org.junit.jupiter.api.condition.EnabledInNativeImage; import javax.sql.DataSource; import java.io.IOException; import java.time.Duration; -import java.time.LocalTime; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; @@ -308,45 +305,4 @@ class JavaTest { }); job.shutdown(); } - - @Test - void testWhenShutdownThenTaskCanCaptureInterruptedException() throws Exception { - testCaptureInterruptedException(1); - testCaptureInterruptedException(2); - } - - private void testCaptureInterruptedException(final int shardingTotalCount) throws Exception { - String jobName = "testTaskCaptureInterruptedTask" + shardingTotalCount; - AtomicBoolean captured = new AtomicBoolean(false); - AtomicBoolean running = new AtomicBoolean(false); - LocalTime oneSecondsLater = LocalTime.now().plusSeconds(1); - String cronExpression = String.format("%d %d %d * * ?", oneSecondsLater.getSecond(), oneSecondsLater.getMinute(), oneSecondsLater.getHour()); - SimpleJob captureInterruptedTask = shardingContext -> { - try { - running.set(true); - - while (true) { - if (Thread.currentThread().isInterrupted()) { - captured.set(true); - running.set(false); - break; - } - System.out.println("Running..."); - Thread.sleep(100); - } - } catch (final InterruptedException e) { - captured.set(true); - running.set(false); - Thread.currentThread().interrupt(); - } - }; - ScheduleJobBootstrap job = new ScheduleJobBootstrap(firstRegCenter, captureInterruptedTask, - JobConfiguration.newBuilder(jobName, shardingTotalCount) - .cron(cronExpression) - .build()); - job.schedule(); - Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).until(running::get); - job.shutdown(); - Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).untilAsserted(() -> assertThat(captured.get(), is(true))); - } }
