This is an automated email from the ASF dual-hosted git repository.
zihaoxiang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d1fddfbd72 [Chore] Merge the DataSourcePluginManager and
DataSourceProcessorManager (#17975)
d1fddfbd72 is described below
commit d1fddfbd7277c01996e64d7c99042d25ca54beb1
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Feb 25 14:33:24 2026 +0800
[Chore] Merge the DataSourcePluginManager and DataSourceProcessorManager
(#17975)
---
.../dolphinscheduler/api/ApiApplicationServer.java | 4 +-
.../api/client/BaseAdHocDataSourceClient.java | 4 +-
.../api/plugin/DataSourceClientProvider.java | 9 +--
.../api/plugin/DataSourcePluginManager.java | 67 +++++++++++++++-------
.../api/plugin/DataSourceProcessorManager.java | 58 -------------------
.../api/plugin/DataSourceProcessorProvider.java | 52 -----------------
.../datasource/api/utils/DataSourceUtils.java | 10 +---
.../param/DatabendDataSourceProcessorTest.java | 5 +-
.../server/master/MasterServer.java | 4 +-
.../plugin/task/procedure/ProcedureTask.java | 4 +-
.../dolphinscheduler/plugin/task/sql/SqlTask.java | 4 +-
.../server/worker/WorkerServer.java | 4 +-
12 files changed, 64 insertions(+), 161 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
index 8a8a8cc102..68d876fa2f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java
@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.CommonConfiguration;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -57,7 +57,7 @@ public class ApiApplicationServer {
public void run(ApplicationReadyEvent readyEvent) {
ServerLifeCycleManager.toRunning();
log.info("Received spring application context ready event will load
taskPlugin and write to DB");
- DataSourceProcessorProvider.initialize();
+ DataSourcePluginManager.loadDataSourcePlugin();
TaskPluginManager.loadTaskPlugin();
}
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/BaseAdHocDataSourceClient.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/BaseAdHocDataSourceClient.java
index fcede5b44d..d22f40f94e 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/BaseAdHocDataSourceClient.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/client/BaseAdHocDataSourceClient.java
@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.plugin.datasource.api.client;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.spi.datasource.AdHocDataSourceClient;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -38,7 +38,7 @@ public abstract class BaseAdHocDataSourceClient implements
AdHocDataSourceClient
@Override
public Connection getConnection() throws SQLException {
try {
- return
DataSourceProcessorProvider.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
+ return
DataSourcePluginManager.getDataSourceProcessor(dbType).getConnection(baseConnectionParam);
} catch (Exception e) {
throw new SQLException("Create adhoc connection error", e);
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
index caae549f03..d8bf74d8ba 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
@@ -57,18 +57,13 @@ public class DataSourceClientProvider {
})
.maximumSize(100)
.build();
- private static final DataSourcePluginManager dataSourcePluginManager = new
DataSourcePluginManager();
-
- static {
- dataSourcePluginManager.installPlugin();
- }
public static DataSourceClient getPooledDataSourceClient(DbType dbType,
ConnectionParam
connectionParam) throws ExecutionException {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam)
connectionParam;
String datasourceUniqueId =
DataSourceUtils.getDatasourceUniqueId(baseConnectionParam, dbType);
return POOLED_DATASOURCE_CLIENT_CACHE.get(datasourceUniqueId, () -> {
- DataSourceChannel dataSourceChannel =
dataSourcePluginManager.getDataSourceChannel(dbType);
+ DataSourceChannel dataSourceChannel =
DataSourcePluginManager.getDataSourceChannel(dbType);
if (null == dataSourceChannel) {
throw new RuntimeException(String.format("datasource plugin
'%s' is not found", dbType.getName()));
}
@@ -83,7 +78,7 @@ public class DataSourceClientProvider {
public static AdHocDataSourceClient getAdHocDataSourceClient(DbType
dbType, ConnectionParam connectionParam) {
BaseConnectionParam baseConnectionParam = (BaseConnectionParam)
connectionParam;
- DataSourceChannel dataSourceChannel =
dataSourcePluginManager.getDataSourceChannel(dbType);
+ DataSourceChannel dataSourceChannel =
DataSourcePluginManager.getDataSourceChannel(dbType);
if (null == dataSourceChannel) {
throw new RuntimeException(String.format("datasource plugin '%s'
is not found", dbType.getName()));
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
index 1bf4e7c5cc..479ad7b7d6 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourcePluginManager.java
@@ -19,47 +19,74 @@ package
org.apache.dolphinscheduler.plugin.datasource.api.plugin;
import static java.lang.String.format;
+import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannel;
import org.apache.dolphinscheduler.spi.datasource.DataSourceChannelFactory;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.dolphinscheduler.spi.plugin.PrioritySPIFactory;
+import org.apache.commons.collections4.MapUtils;
+
import java.util.Map;
+import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
+import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DataSourcePluginManager {
- private final Map<String, DataSourceChannel> datasourceChannelMap = new
ConcurrentHashMap<>();
-
- public DataSourceChannel getDataSourceChannel(final DbType dbType) {
- return datasourceChannelMap.get(dbType.getName());
- }
+ private static final Map<String, DataSourceChannel> datasourceChannelMap =
new ConcurrentHashMap<>();
- public void installPlugin() {
+ private static final Map<String, DataSourceProcessor>
dataSourceProcessorMap = new ConcurrentHashMap<>();
- PrioritySPIFactory<DataSourceChannelFactory> prioritySPIFactory =
- new PrioritySPIFactory<>(DataSourceChannelFactory.class);
- for (Map.Entry<String, DataSourceChannelFactory> entry :
prioritySPIFactory.getSPIMap().entrySet()) {
- final DataSourceChannelFactory factory = entry.getValue();
- final String name = entry.getKey();
+ static {
+ loadDataSourcePlugin();
+ }
- log.info("Registering datasource plugin: {}", name);
+ public static DataSourceChannel getDataSourceChannel(@NonNull DbType
dbType) {
+ return datasourceChannelMap.get(dbType.getName());
+ }
- if (datasourceChannelMap.containsKey(name)) {
- throw new IllegalStateException(format("Duplicate datasource
plugins named '%s'", name));
- }
+ public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType
dbType) {
+ return dataSourceProcessorMap.get(dbType.getName());
+ }
- loadDatasourceClient(factory);
+ public static void loadDataSourcePlugin() {
+ initializeDataSourceChannel();
+ initializeDataSourceProcessor();
+ }
- log.info("Registered datasource plugin: {}", name);
+ private static synchronized void initializeDataSourceChannel() {
+ if (MapUtils.isNotEmpty(datasourceChannelMap)) {
+ return;
}
+ new
PrioritySPIFactory<>(DataSourceChannelFactory.class).getSPIMap().forEach(
+ (dataSourceChannelName, dataSourceChannelFactory) -> {
+ if
(datasourceChannelMap.containsKey(dataSourceChannelName)) {
+ throw new IllegalStateException(
+ format("Duplicate datasource channel named
'%s'", dataSourceChannelName));
+ }
+ datasourceChannelMap.put(dataSourceChannelName,
dataSourceChannelFactory.create());
+ log.info("Registered datasource channel: {}",
dataSourceChannelName);
+ });
}
- private void loadDatasourceClient(DataSourceChannelFactory
datasourceChannelFactory) {
- DataSourceChannel datasourceChannel =
datasourceChannelFactory.create();
- datasourceChannelMap.put(datasourceChannelFactory.getName(),
datasourceChannel);
+ private static synchronized void initializeDataSourceProcessor() {
+ if (MapUtils.isNotEmpty(dataSourceProcessorMap)) {
+ return;
+ }
+
+ ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
+ final String name = factory.getDbType().getName();
+ if (dataSourceProcessorMap.containsKey(name)) {
+ throw new IllegalStateException(format("Duplicate datasource
processor named '%s'", name));
+ }
+ DataSourceProcessor dataSourceProcessor = factory.create();
+ dataSourceProcessorMap.put(name, dataSourceProcessor);
+ log.info("Success register datasource processor -> {}", name);
+ });
}
+
}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java
deleted file mode 100644
index da7376098e..0000000000
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
-
-import static java.lang.String.format;
-
-import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class DataSourceProcessorManager {
-
- private static final Map<String, DataSourceProcessor>
dataSourceProcessorMap = new ConcurrentHashMap<>();
-
- public Map<String, DataSourceProcessor> getDataSourceProcessorMap() {
- return Collections.unmodifiableMap(dataSourceProcessorMap);
- }
-
- public void installProcessor() {
-
- ServiceLoader.load(DataSourceProcessor.class).forEach(factory -> {
- final String name = factory.getDbType().name();
-
- if (dataSourceProcessorMap.containsKey(name)) {
- throw new IllegalStateException(format("Duplicate datasource
plugins named '%s'", name));
- }
- loadDatasourceClient(factory);
- log.info("Success register datasource plugin -> {}", name);
-
- });
- }
-
- private void loadDatasourceClient(DataSourceProcessor processor) {
- DataSourceProcessor instance = processor.create();
- dataSourceProcessorMap.put(processor.getDbType().name(), instance);
- }
-}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
deleted file mode 100644
index 973421615f..0000000000
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceProcessorProvider.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.plugin.datasource.api.plugin;
-
-import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
-import org.apache.dolphinscheduler.spi.enums.DbType;
-
-import java.util.Map;
-
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class DataSourceProcessorProvider {
-
- private static final DataSourceProcessorManager dataSourcePluginManager =
new DataSourceProcessorManager();
-
- static {
- dataSourcePluginManager.installProcessor();
- }
-
- private DataSourceProcessorProvider() {
- }
-
- public static void initialize() {
- log.info("Initialize DataSourceProcessorProvider");
- }
-
- public static DataSourceProcessor getDataSourceProcessor(@NonNull DbType
dbType) {
- return
dataSourcePluginManager.getDataSourceProcessorMap().get(dbType.name());
- }
-
- public static Map<String, DataSourceProcessor> getDataSourceProcessorMap()
{
- return dataSourcePluginManager.getDataSourceProcessorMap();
- }
-
-}
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
index 20e80d945f..69a05c83ac 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/DataSourceUtils.java
@@ -20,12 +20,11 @@ package
org.apache.dolphinscheduler.plugin.datasource.api.utils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.BaseDataSourceParamDTO;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import java.sql.Connection;
-import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@@ -75,12 +74,7 @@ public class DataSourceUtils {
}
public static DataSourceProcessor getDatasourceProcessor(DbType dbType) {
- Map<String, DataSourceProcessor> dataSourceProcessorMap =
- DataSourceProcessorProvider.getDataSourceProcessorMap();
- if (!dataSourceProcessorMap.containsKey(dbType.name())) {
- throw new IllegalArgumentException("illegal datasource type");
- }
- return dataSourceProcessorMap.get(dbType.name());
+ return DataSourcePluginManager.getDataSourceProcessor(dbType);
}
/**
diff --git
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-databend/src/test/java/org/apache/dolphinscheduler/plugin/datasource/databend/param/DatabendDataSourceProcessorTest.java
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-databend/src/test/java/org/apache/dolphinscheduler/plugin/datasource/databend/param/DatabendDataSourceProcessorTest.java
index cb41c6562b..60903f4b97 100644
---
a/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-databend/src/test/java/org/apache/dolphinscheduler/plugin/datasource/databend/param/DatabendDataSourceProcessorTest.java
+++
b/dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-databend/src/test/java/org/apache/dolphinscheduler/plugin/datasource/databend/param/DatabendDataSourceProcessorTest.java
@@ -30,15 +30,12 @@ import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-@ExtendWith(MockitoExtension.class)
public class DatabendDataSourceProcessorTest {
- private DatabendDataSourceProcessor databendDataSourceProcessor = new
DatabendDataSourceProcessor();
+ private final DatabendDataSourceProcessor databendDataSourceProcessor =
new DatabendDataSourceProcessor();
@Test
public void testCheckDatasourceParam() {
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 3a23ddf881..1b5654a797 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -26,7 +26,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.DaoConfiguration;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -123,7 +123,7 @@ public class MasterServer implements IStoppable {
// install task plugin
TaskPluginManager.loadTaskPlugin();
- DataSourceProcessorProvider.initialize();
+ DataSourcePluginManager.loadDataSourcePlugin();
// self tolerant
this.masterRegistryClient.start();
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
index 0b47c20eb0..70b1042168 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
@@ -23,7 +23,7 @@ import static
org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import
org.apache.dolphinscheduler.plugin.datasource.api.datasource.DataSourceProcessor;
import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
@@ -95,7 +95,7 @@ public class ProcedureTask extends AbstractTask {
procedureParameters.getLocalParams());
DbType dbType = DbType.valueOf(procedureParameters.getType());
- DataSourceProcessor dataSourceProcessor =
DataSourceProcessorProvider.getDataSourceProcessor(dbType);
+ DataSourceProcessor dataSourceProcessor =
DataSourcePluginManager.getDataSourceProcessor(dbType);
ConnectionParam connectionParams =
dataSourceProcessor.createConnectionParams(procedureTaskExecutionContext.getConnectionParams());
try (Connection connection =
DataSourceClientProvider.getAdHocConnection(dbType, connectionParams)) {
diff --git
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
index 44296bc331..33b85e1a66 100644
---
a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
+++
b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.sql;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.SQLTaskExecutionContext;
@@ -131,7 +131,7 @@ public class SqlTask extends AbstractTask {
// get datasource
baseConnectionParam = (BaseConnectionParam)
DataSourceUtils.buildConnectionParams(dbType,
sqlTaskExecutionContext.getConnectionParams());
- List<String> subSqls =
DataSourceProcessorProvider.getDataSourceProcessor(dbType)
+ List<String> subSqls =
DataSourcePluginManager.getDataSourceProcessor(dbType)
.splitAndRemoveComment(sqlParameters.getSql());
// ready to execute SQL and parameter entity Map
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index 26c847ca21..13c2f2aaac 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -25,7 +25,7 @@ import
org.apache.dolphinscheduler.common.thread.DefaultUncaughtExceptionHandler
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
-import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceProcessorProvider;
+import
org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourcePluginManager;
import org.apache.dolphinscheduler.plugin.storage.api.StorageConfiguration;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.registry.api.RegistryConfiguration;
@@ -82,7 +82,7 @@ public class WorkerServer implements IStoppable {
TaskPluginManager.loadTaskPlugin();
- DataSourceProcessorProvider.initialize();
+ DataSourcePluginManager.loadDataSourcePlugin();
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();