This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere-elasticjob.git
The following commit(s) were added to refs/heads/master by this push:
new e3e9dec0a Remove Reloadable (#2318)
e3e9dec0a is described below
commit e3e9dec0a6b7e6ac8b44950c9bcb590a3e216d90
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Oct 26 22:54:18 2023 +0800
Remove Reloadable (#2318)
* Remove ExecutorContext
* Remove ExecutorContext
* Move ExecutorServiceReloadable to kernel module
* Remove Reloadable
* Remove Reloadable
* Remove Reloadable
---
...eloadable.java => JobErrorHandlerReloader.java} | 44 ++++++--------
...rdingsphere.elasticjob.infra.context.Reloadable | 18 ------
...eTest.java => JobErrorHandlerReloaderTest.java} | 40 ++++++------
ecosystem/executor/kernel/pom.xml | 5 ++
.../elasticjob/executor/ElasticJobExecutor.java | 23 ++++---
.../executor/context/ExecutorContext.java | 71 ----------------------
.../threadpool/ElasticJobExecutorService.java | 2 +-
.../threadpool/ExecutorServiceReloader.java | 48 ++++++---------
.../JobExecutorThreadPoolSizeProvider.java | 2 +-
.../CPUUsageJobExecutorThreadPoolSizeProvider.java | 4 +-
...gleThreadJobExecutorThreadPoolSizeProvider.java | 4 +-
...or.threadpool.JobExecutorThreadPoolSizeProvider | 4 +-
.../threadpool/ElasticJobExecutorServiceTest.java | 5 +-
.../threadpool/ExecutorServiceReloaderTest.java | 40 ++++++------
...UsageJobExecutorThreadPoolSizeProviderTest.java | 4 +-
...hreadJobExecutorThreadPoolSizeProviderTest.java | 4 +-
.../elasticjob/infra/context/Reloadable.java | 57 -----------------
...rdingsphere.elasticjob.infra.context.Reloadable | 18 ------
18 files changed, 107 insertions(+), 286 deletions(-)
diff --git
a/ecosystem/error-handler/type/general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java
b/ecosystem/error-handler/type/general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloader.java
similarity index 57%
rename from
ecosystem/error-handler/type/general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java
rename to
ecosystem/error-handler/type/general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloader.java
index f81bfc91b..3d5a68a8e 100644
---
a/ecosystem/error-handler/type/general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadable.java
+++
b/ecosystem/error-handler/type/general/src/main/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloader.java
@@ -17,57 +17,47 @@
package org.apache.shardingsphere.elasticjob.error.handler;
-import lombok.extern.slf4j.Slf4j;
+import lombok.Getter;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import java.util.Optional;
+import java.io.Closeable;
import java.util.Properties;
/**
- * JobErrorHandler reloadable.
+ * Job error handler reloader.
*/
-@Slf4j
-public final class JobErrorHandlerReloadable implements
Reloadable<JobErrorHandler> {
+public final class JobErrorHandlerReloader implements Closeable {
private Properties props;
+ @Getter
private JobErrorHandler jobErrorHandler;
- @Override
- public void init(final JobConfiguration jobConfig) {
- props = (Properties) jobConfig.getProps().clone();
- jobErrorHandler = TypedSPILoader.getService(JobErrorHandler.class,
jobConfig.getJobErrorHandlerType(), props);
+ public JobErrorHandlerReloader(final JobConfiguration jobConfig) {
+ init(jobConfig);
}
- @Override
+ /**
+ * Reload if necessary.
+ *
+ * @param jobConfig job configuration
+ */
public synchronized void reloadIfNecessary(final JobConfiguration
jobConfig) {
if
(jobErrorHandler.getType().equals(jobConfig.getJobErrorHandlerType()) &&
props.equals(jobConfig.getProps())) {
return;
}
- log.debug("JobErrorHandler reload occurred in the job '{}'. Change
from '{}' to '{}'.", jobConfig.getJobName(), jobErrorHandler.getType(),
jobConfig.getJobErrorHandlerType());
- reload(jobConfig.getJobErrorHandlerType(), jobConfig.getProps());
- }
-
- private void reload(final String jobErrorHandlerType, final Properties
props) {
jobErrorHandler.close();
- this.props = (Properties) props.clone();
- jobErrorHandler = TypedSPILoader.getService(JobErrorHandler.class,
jobErrorHandlerType, props);
+ init(jobConfig);
}
- @Override
- public JobErrorHandler getInstance() {
- return jobErrorHandler;
- }
-
- @Override
- public Class<JobErrorHandler> getType() {
- return JobErrorHandler.class;
+ private void init(final JobConfiguration jobConfig) {
+ props = jobConfig.getProps();
+ jobErrorHandler = TypedSPILoader.getService(JobErrorHandler.class,
jobConfig.getJobErrorHandlerType(), props);
}
@Override
public void close() {
- Optional.ofNullable(jobErrorHandler).ifPresent(JobErrorHandler::close);
+ jobErrorHandler.close();
}
}
diff --git
a/ecosystem/error-handler/type/general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
b/ecosystem/error-handler/type/general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
deleted file mode 100644
index 0a00a1dce..000000000
---
a/ecosystem/error-handler/type/general/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerReloadable
diff --git
a/ecosystem/error-handler/type/general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java
b/ecosystem/error-handler/type/general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloaderTest.java
similarity index 64%
rename from
ecosystem/error-handler/type/general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java
rename to
ecosystem/error-handler/type/general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloaderTest.java
index 61973a949..faf4a697d 100644
---
a/ecosystem/error-handler/type/general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloadableTest.java
+++
b/ecosystem/error-handler/type/general/src/test/java/org/apache/shardingsphere/elasticjob/error/handler/JobErrorHandlerReloaderTest.java
@@ -32,24 +32,21 @@ import java.util.Properties;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
-class JobErrorHandlerReloadableTest {
+class JobErrorHandlerReloaderTest {
@Mock
private JobErrorHandler jobErrorHandler;
@Test
void assertInitialize() {
- try (JobErrorHandlerReloadable jobErrorHandlerReloadable = new
JobErrorHandlerReloadable()) {
- JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobErrorHandlerType("IGNORE").build();
- assertNull(jobErrorHandlerReloadable.getInstance());
- jobErrorHandlerReloadable.init(jobConfig);
- JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobErrorHandlerType("IGNORE").build();
+ try (JobErrorHandlerReloader jobErrorHandlerReloader = new
JobErrorHandlerReloader(jobConfig)) {
+ JobErrorHandler actual =
jobErrorHandlerReloader.getJobErrorHandler();
assertNotNull(actual);
assertThat(actual.getType(), is("IGNORE"));
assertTrue(actual instanceof IgnoreJobErrorHandler);
@@ -58,15 +55,16 @@ class JobErrorHandlerReloadableTest {
@Test
void assertReload() {
- try (JobErrorHandlerReloadable jobErrorHandlerReloadable = new
JobErrorHandlerReloadable()) {
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobErrorHandlerType("IGNORE").build();
+ try (JobErrorHandlerReloader jobErrorHandlerReloader = new
JobErrorHandlerReloader(jobConfig)) {
when(jobErrorHandler.getType()).thenReturn("mock");
- setField(jobErrorHandlerReloadable, "jobErrorHandler",
jobErrorHandler);
- setField(jobErrorHandlerReloadable, "props", new Properties());
+ setField(jobErrorHandlerReloader, "jobErrorHandler",
jobErrorHandler);
+ setField(jobErrorHandlerReloader, "props", new Properties());
String newJobErrorHandlerType = "LOG";
JobConfiguration newJobConfig = JobConfiguration.newBuilder("job",
1).jobErrorHandlerType(newJobErrorHandlerType).build();
- jobErrorHandlerReloadable.reloadIfNecessary(newJobConfig);
+ jobErrorHandlerReloader.reloadIfNecessary(newJobConfig);
verify(jobErrorHandler).close();
- JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+ JobErrorHandler actual =
jobErrorHandlerReloader.getJobErrorHandler();
assertThat(actual.getType(), is(newJobErrorHandlerType));
assertTrue(actual instanceof LogJobErrorHandler);
}
@@ -74,21 +72,21 @@ class JobErrorHandlerReloadableTest {
@Test
void assertUnnecessaryToReload() {
- try (JobErrorHandlerReloadable jobErrorHandlerReloadable = new
JobErrorHandlerReloadable()) {
- JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobErrorHandlerType("IGNORE").build();
- jobErrorHandlerReloadable.init(jobConfig);
- JobErrorHandler expected = jobErrorHandlerReloadable.getInstance();
- jobErrorHandlerReloadable.reloadIfNecessary(jobConfig);
- JobErrorHandler actual = jobErrorHandlerReloadable.getInstance();
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobErrorHandlerType("IGNORE").build();
+ try (JobErrorHandlerReloader jobErrorHandlerReloader = new
JobErrorHandlerReloader(jobConfig)) {
+ JobErrorHandler expected =
jobErrorHandlerReloader.getJobErrorHandler();
+ jobErrorHandlerReloader.reloadIfNecessary(jobConfig);
+ JobErrorHandler actual =
jobErrorHandlerReloader.getJobErrorHandler();
assertThat(actual, is(expected));
}
}
@Test
void assertShutdown() {
- try (JobErrorHandlerReloadable jobErrorHandlerReloadable = new
JobErrorHandlerReloadable()) {
- setField(jobErrorHandlerReloadable, "jobErrorHandler",
jobErrorHandler);
- jobErrorHandlerReloadable.close();
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobErrorHandlerType("IGNORE").build();
+ try (JobErrorHandlerReloader jobErrorHandlerReloader = new
JobErrorHandlerReloader(jobConfig)) {
+ setField(jobErrorHandlerReloader, "jobErrorHandler",
jobErrorHandler);
+ jobErrorHandlerReloader.close();
verify(jobErrorHandler).close();
}
}
diff --git a/ecosystem/executor/kernel/pom.xml
b/ecosystem/executor/kernel/pom.xml
index 6b285ae81..fda6b0f7b 100644
--- a/ecosystem/executor/kernel/pom.xml
+++ b/ecosystem/executor/kernel/pom.xml
@@ -47,5 +47,10 @@
<artifactId>elasticjob-error-handler-general</artifactId>
<version>${project.parent.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git
a/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
index 5c3e7dbb5..817d18229 100644
---
a/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
+++
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/ElasticJobExecutor.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ElasticJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandler;
-import org.apache.shardingsphere.elasticjob.executor.context.ExecutorContext;
+import
org.apache.shardingsphere.elasticjob.error.handler.JobErrorHandlerReloader;
import org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutor;
import
org.apache.shardingsphere.elasticjob.executor.item.JobItemExecutorFactory;
import
org.apache.shardingsphere.elasticjob.executor.item.impl.TypedJobItemExecutor;
@@ -29,6 +29,7 @@ import org.apache.shardingsphere.elasticjob.infra.env.IpUtils;
import org.apache.shardingsphere.elasticjob.infra.exception.ExceptionUtils;
import
org.apache.shardingsphere.elasticjob.infra.exception.JobExecutionEnvironmentException;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
+import
org.apache.shardingsphere.elasticjob.executor.threadpool.ExecutorServiceReloader;
import org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent;
import
org.apache.shardingsphere.elasticjob.tracing.event.JobExecutionEvent.ExecutionSource;
import
org.apache.shardingsphere.elasticjob.tracing.event.JobStatusTraceEvent.State;
@@ -52,7 +53,9 @@ public final class ElasticJobExecutor {
private final JobItemExecutor jobItemExecutor;
- private final ExecutorContext executorContext;
+ private final ExecutorServiceReloader executorServiceReloader;
+
+ private final JobErrorHandlerReloader jobErrorHandlerReloader;
private final Map<Integer, String> itemErrorMessages;
@@ -68,7 +71,9 @@ public final class ElasticJobExecutor {
this.elasticJob = elasticJob;
this.jobFacade = jobFacade;
this.jobItemExecutor = jobItemExecutor;
- executorContext = new
ExecutorContext(jobFacade.loadJobConfiguration(true));
+ JobConfiguration loadedJobConfig =
jobFacade.loadJobConfiguration(true);
+ executorServiceReloader = new ExecutorServiceReloader(loadedJobConfig);
+ jobErrorHandlerReloader = new JobErrorHandlerReloader(loadedJobConfig);
itemErrorMessages = new
ConcurrentHashMap<>(jobConfig.getShardingTotalCount(), 1);
}
@@ -77,8 +82,9 @@ public final class ElasticJobExecutor {
*/
public void execute() {
JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
- executorContext.reloadIfNecessary(jobConfig);
- JobErrorHandler jobErrorHandler =
executorContext.get(JobErrorHandler.class);
+ executorServiceReloader.reloadIfNecessary(jobConfig);
+ jobErrorHandlerReloader.reloadIfNecessary(jobConfig);
+ JobErrorHandler jobErrorHandler =
jobErrorHandlerReloader.getJobErrorHandler();
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
@@ -147,7 +153,7 @@ public final class ElasticJobExecutor {
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new
JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(),
shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
- ExecutorService executorService =
executorContext.get(ExecutorService.class);
+ ExecutorService executorService =
executorServiceReloader.getExecutorService();
if (executorService.isShutdown()) {
return;
}
@@ -182,7 +188,7 @@ public final class ElasticJobExecutor {
completeEvent =
startEvent.executionFailure(ExceptionUtils.transform(cause));
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtils.transform(cause));
- JobErrorHandler jobErrorHandler =
executorContext.get(JobErrorHandler.class);
+ JobErrorHandler jobErrorHandler =
jobErrorHandlerReloader.getJobErrorHandler();
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
@@ -191,6 +197,7 @@ public final class ElasticJobExecutor {
* Shutdown executor.
*/
public void shutdown() {
- executorContext.shutdown();
+ executorServiceReloader.close();
+ jobErrorHandlerReloader.close();
}
}
diff --git
a/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java
deleted file mode 100644
index 858ff744e..000000000
---
a/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/context/ExecutorContext.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.executor.context;
-
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Executor context.
- */
-public final class ExecutorContext {
-
- public ExecutorContext(final JobConfiguration jobConfig) {
- for (Reloadable<?> each :
ShardingSphereServiceLoader.getServiceInstances(Reloadable.class)) {
- each.init(jobConfig);
- }
- }
-
- /**
- * Reload all reloadable item if necessary.
- *
- * @param jobConfig job configuration
- */
- public void reloadIfNecessary(final JobConfiguration jobConfig) {
-
ShardingSphereServiceLoader.getServiceInstances(Reloadable.class).forEach(each
-> each.reloadIfNecessary(jobConfig));
- }
-
- /**
- * Get instance.
- *
- * @param targetClass target class
- * @param <T> target type
- * @return instance
- */
- @SuppressWarnings("unchecked")
- public <T> T get(final Class<T> targetClass) {
- return (T) TypedSPILoader.getService(Reloadable.class, targetClass,
new Properties()).getInstance();
- }
-
- /**
- * Shutdown all closeable instances.
- */
- public void shutdown() {
- for (Reloadable<?> each :
ShardingSphereServiceLoader.getServiceInstances(Reloadable.class)) {
- try {
- each.close();
- } catch (final IOException ignored) {
- }
- }
- }
-}
diff --git
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ElasticJobExecutorService.java
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ElasticJobExecutorService.java
similarity index 97%
rename from
infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ElasticJobExecutorService.java
rename to
ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ElasticJobExecutorService.java
index f361a9595..0dc38cb80 100644
---
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ElasticJobExecutorService.java
+++
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ElasticJobExecutorService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool;
+package org.apache.shardingsphere.elasticjob.executor.threadpool;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
diff --git
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ExecutorServiceReloadable.java
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ExecutorServiceReloader.java
similarity index 59%
rename from
infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ExecutorServiceReloadable.java
rename to
ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ExecutorServiceReloader.java
index 375b2d77c..6c3e77193 100644
---
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ExecutorServiceReloadable.java
+++
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ExecutorServiceReloader.java
@@ -15,60 +15,50 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool;
+package org.apache.shardingsphere.elasticjob.executor.threadpool;
-import lombok.extern.slf4j.Slf4j;
+import lombok.Getter;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
-import org.apache.shardingsphere.elasticjob.infra.context.Reloadable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import java.util.Optional;
+import java.io.Closeable;
import java.util.concurrent.ExecutorService;
/**
- * Executor service reloadable.
+ * Executor service reloader.
*/
-@Slf4j
-public final class ExecutorServiceReloadable implements
Reloadable<ExecutorService> {
+public final class ExecutorServiceReloader implements Closeable {
private String jobExecutorThreadPoolSizeProviderType;
+ @Getter
private ExecutorService executorService;
- @Override
- public void init(final JobConfiguration jobConfig) {
- JobExecutorThreadPoolSizeProvider jobExecutorThreadPoolSizeProvider =
TypedSPILoader.getService(JobExecutorThreadPoolSizeProvider.class,
jobConfig.getJobExecutorThreadPoolSizeProviderType());
- jobExecutorThreadPoolSizeProviderType =
jobExecutorThreadPoolSizeProvider.getType();
- executorService = new ElasticJobExecutorService("elasticjob-" +
jobConfig.getJobName(),
jobExecutorThreadPoolSizeProvider.getSize()).createExecutorService();
+ public ExecutorServiceReloader(final JobConfiguration jobConfig) {
+ init(jobConfig);
}
- @Override
+ /**
+ * Reload if necessary.
+ *
+ * @param jobConfig job configuration
+ */
public synchronized void reloadIfNecessary(final JobConfiguration
jobConfig) {
if
(jobExecutorThreadPoolSizeProviderType.equals(jobConfig.getJobExecutorThreadPoolSizeProviderType()))
{
return;
}
- log.debug("Reload occurred in the job '{}'. Change from '{}' to
'{}'.", jobConfig.getJobName(), jobExecutorThreadPoolSizeProviderType,
jobConfig.getJobExecutorThreadPoolSizeProviderType());
- reload(jobConfig.getJobExecutorThreadPoolSizeProviderType(),
jobConfig.getJobName());
- }
-
- private void reload(final String jobExecutorThreadPoolSizeProviderType,
final String jobName) {
executorService.shutdown();
- executorService = new ElasticJobExecutorService(
- "elasticjob-" + jobName,
TypedSPILoader.getService(JobExecutorThreadPoolSizeProvider.class,
jobExecutorThreadPoolSizeProviderType).getSize()).createExecutorService();
+ init(jobConfig);
}
- @Override
- public ExecutorService getInstance() {
- return executorService;
+ private void init(final JobConfiguration jobConfig) {
+ JobExecutorThreadPoolSizeProvider jobExecutorThreadPoolSizeProvider =
TypedSPILoader.getService(JobExecutorThreadPoolSizeProvider.class,
jobConfig.getJobExecutorThreadPoolSizeProviderType());
+ jobExecutorThreadPoolSizeProviderType =
jobExecutorThreadPoolSizeProvider.getType();
+ executorService = new ElasticJobExecutorService("elasticjob-" +
jobConfig.getJobName(),
jobExecutorThreadPoolSizeProvider.getSize()).createExecutorService();
}
@Override
public void close() {
-
Optional.ofNullable(executorService).ifPresent(ExecutorService::shutdown);
- }
-
- @Override
- public Class<ExecutorService> getType() {
- return ExecutorService.class;
+ executorService.shutdown();
}
}
diff --git
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/JobExecutorThreadPoolSizeProvider.java
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/JobExecutorThreadPoolSizeProvider.java
similarity index 94%
rename from
infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/JobExecutorThreadPoolSizeProvider.java
rename to
ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/JobExecutorThreadPoolSizeProvider.java
index 009c4f1b5..a2e29883b 100644
---
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/JobExecutorThreadPoolSizeProvider.java
+++
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/JobExecutorThreadPoolSizeProvider.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool;
+package org.apache.shardingsphere.elasticjob.executor.threadpool;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
diff --git
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProvider.java
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProvider.java
similarity index 88%
rename from
infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProvider.java
rename to
ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProvider.java
index 1dfe249ed..1903142c9 100644
---
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProvider.java
+++
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProvider.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool.type;
+package org.apache.shardingsphere.elasticjob.executor.threadpool.type;
-import
org.apache.shardingsphere.elasticjob.infra.threadpool.JobExecutorThreadPoolSizeProvider;
+import
org.apache.shardingsphere.elasticjob.executor.threadpool.JobExecutorThreadPoolSizeProvider;
/**
* Job executor pool size provider with use CPU available processors.
diff --git
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProvider.java
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProvider.java
similarity index 87%
rename from
infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProvider.java
rename to
ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProvider.java
index 0539262f7..1248e3057 100644
---
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProvider.java
+++
b/ecosystem/executor/kernel/src/main/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProvider.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool.type;
+package org.apache.shardingsphere.elasticjob.executor.threadpool.type;
-import
org.apache.shardingsphere.elasticjob.infra.threadpool.JobExecutorThreadPoolSizeProvider;
+import
org.apache.shardingsphere.elasticjob.executor.threadpool.JobExecutorThreadPoolSizeProvider;
/**
* Job executor pool size provider with single thread.
diff --git
a/infra/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.threadpool.JobExecutorThreadPoolSizeProvider
b/ecosystem/executor/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.executor.threadpool.JobExecutorThreadPoolSizeProvider
similarity index 78%
rename from
infra/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.threadpool.JobExecutorThreadPoolSizeProvider
rename to
ecosystem/executor/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.executor.threadpool.JobExecutorThreadPoolSizeProvider
index f8f1d2c7c..40f0e643d 100644
---
a/infra/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.threadpool.JobExecutorThreadPoolSizeProvider
+++
b/ecosystem/executor/kernel/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.executor.threadpool.JobExecutorThreadPoolSizeProvider
@@ -15,5 +15,5 @@
# limitations under the License.
#
-org.apache.shardingsphere.elasticjob.infra.threadpool.type.CPUUsageJobExecutorThreadPoolSizeProvider
-org.apache.shardingsphere.elasticjob.infra.threadpool.type.SingleThreadJobExecutorThreadPoolSizeProvider
+org.apache.shardingsphere.elasticjob.executor.threadpool.type.CPUUsageJobExecutorThreadPoolSizeProvider
+org.apache.shardingsphere.elasticjob.executor.threadpool.type.SingleThreadJobExecutorThreadPoolSizeProvider
diff --git
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ElasticJobExecutorServiceTest.java
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ElasticJobExecutorServiceTest.java
similarity index 93%
rename from
infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ElasticJobExecutorServiceTest.java
rename to
ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ElasticJobExecutorServiceTest.java
index a03878a90..293bfbf62 100644
---
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ElasticJobExecutorServiceTest.java
+++
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ElasticJobExecutorServiceTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool;
+package org.apache.shardingsphere.elasticjob.executor.threadpool;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
@@ -61,8 +61,7 @@ class ElasticJobExecutorServiceTest {
@Override
public void run() {
- Awaitility.await().atMost(1L, TimeUnit.MINUTES)
- .untilAsserted(() -> assertThat(hasExecuted, is(true)));
+ Awaitility.await().atMost(1L, TimeUnit.MINUTES).untilAsserted(()
-> assertThat(hasExecuted, is(true)));
}
}
}
diff --git
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ExecutorServiceReloadableTest.java
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ExecutorServiceReloaderTest.java
similarity index 60%
rename from
infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ExecutorServiceReloadableTest.java
rename to
ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ExecutorServiceReloaderTest.java
index ef6b11273..92f54d877 100644
---
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/ExecutorServiceReloadableTest.java
+++
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/ExecutorServiceReloaderTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool;
+package org.apache.shardingsphere.elasticjob.executor.threadpool;
import lombok.SneakyThrows;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
@@ -31,22 +31,19 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.verify;
@ExtendWith(MockitoExtension.class)
-class ExecutorServiceReloadableTest {
+class ExecutorServiceReloaderTest {
@Mock
private ExecutorService mockExecutorService;
@Test
void assertInitialize() {
- try (ExecutorServiceReloadable executorServiceReloadable = new
ExecutorServiceReloadable()) {
- JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobExecutorThreadPoolSizeProviderType("SINGLE_THREAD").build();
- assertNull(executorServiceReloadable.getInstance());
- executorServiceReloadable.init(jobConfig);
- ExecutorService actual = executorServiceReloadable.getInstance();
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobExecutorThreadPoolSizeProviderType("SINGLE_THREAD").build();
+ try (ExecutorServiceReloader executorServiceReloader = new
ExecutorServiceReloader(jobConfig)) {
+ ExecutorService actual =
executorServiceReloader.getExecutorService();
assertNotNull(actual);
assertFalse(actual.isShutdown());
assertFalse(actual.isTerminated());
@@ -56,13 +53,13 @@ class ExecutorServiceReloadableTest {
@Test
void assertReload() {
- ExecutorServiceReloadable executorServiceReloadable = new
ExecutorServiceReloadable();
- setField(executorServiceReloadable,
"jobExecutorThreadPoolSizeProviderType", "mock");
- setField(executorServiceReloadable, "executorService",
mockExecutorService);
+ ExecutorServiceReloader executorServiceReloader = new
ExecutorServiceReloader(JobConfiguration.newBuilder("job",
1).jobExecutorThreadPoolSizeProviderType("SINGLE_THREAD").build());
+ setField(executorServiceReloader,
"jobExecutorThreadPoolSizeProviderType", "mock");
+ setField(executorServiceReloader, "executorService",
mockExecutorService);
JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).build();
- executorServiceReloadable.reloadIfNecessary(jobConfig);
+ executorServiceReloader.reloadIfNecessary(jobConfig);
verify(mockExecutorService).shutdown();
- ExecutorService actual = executorServiceReloadable.getInstance();
+ ExecutorService actual = executorServiceReloader.getExecutorService();
assertFalse(actual.isShutdown());
assertFalse(actual.isTerminated());
actual.shutdown();
@@ -70,12 +67,11 @@ class ExecutorServiceReloadableTest {
@Test
void assertUnnecessaryToReload() {
- try (ExecutorServiceReloadable executorServiceReloadable = new
ExecutorServiceReloadable()) {
- JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobExecutorThreadPoolSizeProviderType("CPU").build();
- executorServiceReloadable.init(jobConfig);
- ExecutorService expected = executorServiceReloadable.getInstance();
- executorServiceReloadable.reloadIfNecessary(jobConfig);
- ExecutorService actual = executorServiceReloadable.getInstance();
+ JobConfiguration jobConfig = JobConfiguration.newBuilder("job",
1).jobExecutorThreadPoolSizeProviderType("CPU").build();
+ try (ExecutorServiceReloader executorServiceReloader = new
ExecutorServiceReloader(jobConfig)) {
+ ExecutorService expected =
executorServiceReloader.getExecutorService();
+ executorServiceReloader.reloadIfNecessary(jobConfig);
+ ExecutorService actual =
executorServiceReloader.getExecutorService();
assertThat(actual, is(expected));
actual.shutdown();
}
@@ -83,9 +79,9 @@ class ExecutorServiceReloadableTest {
@Test
void assertShutdown() {
- ExecutorServiceReloadable executorServiceReloadable = new
ExecutorServiceReloadable();
- setField(executorServiceReloadable, "executorService",
mockExecutorService);
- executorServiceReloadable.close();
+ ExecutorServiceReloader executorServiceReloader = new
ExecutorServiceReloader(JobConfiguration.newBuilder("job",
1).jobExecutorThreadPoolSizeProviderType("SINGLE_THREAD").build());
+ setField(executorServiceReloader, "executorService",
mockExecutorService);
+ executorServiceReloader.close();
verify(mockExecutorService).shutdown();
}
diff --git
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProviderTest.java
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProviderTest.java
similarity index 88%
rename from
infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProviderTest.java
rename to
ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProviderTest.java
index ba613c3e9..50b61a864 100644
---
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProviderTest.java
+++
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/CPUUsageJobExecutorThreadPoolSizeProviderTest.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool.type;
+package org.apache.shardingsphere.elasticjob.executor.threadpool.type;
-import
org.apache.shardingsphere.elasticjob.infra.threadpool.JobExecutorThreadPoolSizeProvider;
+import
org.apache.shardingsphere.elasticjob.executor.threadpool.JobExecutorThreadPoolSizeProvider;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
diff --git
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProviderTest.java
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProviderTest.java
similarity index 87%
rename from
infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProviderTest.java
rename to
ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProviderTest.java
index af7efe5a7..97119b5ce 100644
---
a/infra/src/test/java/org/apache/shardingsphere/elasticjob/infra/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProviderTest.java
+++
b/ecosystem/executor/kernel/src/test/java/org/apache/shardingsphere/elasticjob/executor/threadpool/type/SingleThreadJobExecutorThreadPoolSizeProviderTest.java
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.elasticjob.infra.threadpool.type;
+package org.apache.shardingsphere.elasticjob.executor.threadpool.type;
-import
org.apache.shardingsphere.elasticjob.infra.threadpool.JobExecutorThreadPoolSizeProvider;
+import
org.apache.shardingsphere.elasticjob.executor.threadpool.JobExecutorThreadPoolSizeProvider;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
diff --git
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
b/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
deleted file mode 100644
index f0a5d5218..000000000
---
a/infra/src/main/java/org/apache/shardingsphere/elasticjob/infra/context/Reloadable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.elasticjob.infra.context;
-
-import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
-import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
-
-import java.io.Closeable;
-
-/**
- * Reloadable.
- *
- * @param <T> reload target
- */
-@SingletonSPI
-public interface Reloadable<T> extends TypedSPI, Closeable {
-
- /**
- * Initialize reloadable.
- *
- * @param jobConfig job configuration
- */
- void init(JobConfiguration jobConfig);
-
- /**
- * Reload if necessary.
- *
- * @param jobConfig job configuration
- */
- void reloadIfNecessary(JobConfiguration jobConfig);
-
- /**
- * Get target instance.
- *
- * @return instance
- */
- T getInstance();
-
- @Override
- Class<T> getType();
-}
diff --git
a/infra/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
b/infra/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
deleted file mode 100644
index 6a5c61ca1..000000000
---
a/infra/src/main/resources/META-INF/services/org.apache.shardingsphere.elasticjob.infra.context.Reloadable
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.elasticjob.infra.threadpool.ExecutorServiceReloadable