This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 28814e9e068 Remove TransmissionJobOption.extendYamlJobConfiguration()
(#29242)
28814e9e068 is described below
commit 28814e9e0682a77bcb6ad81ab096b917fafd9d64
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 1 00:03:22 2023 +0800
Remove TransmissionJobOption.extendYamlJobConfiguration() (#29242)
---
.../core/job/option/TransmissionJobOption.java | 10 ----------
.../data/pipeline/cdc/CDCJobOption.java | 19 +------------------
.../data/pipeline/cdc/api/CDCJobAPI.java | 12 ++++++++----
.../scenario/migration/MigrationJobOption.java | 11 -----------
.../scenario/migration/api/MigrationJobAPI.java | 7 +++----
.../pipeline/core/util/JobConfigurationBuilder.java | 4 ----
6 files changed, 12 insertions(+), 51 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index a9a630ec205..2564468586f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -18,9 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job.option;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -47,14 +45,6 @@ public interface TransmissionJobOption extends
PipelineJobOption {
*/
PipelineJobInfo getJobInfo(String jobId);
- /**
- * Extend YAML job configuration.
- *
- * @param contextKey context key
- * @param yamlJobConfig YAML job configuration
- */
- void extendYamlJobConfiguration(PipelineContextKey contextKey,
YamlPipelineJobConfiguration yamlJobConfig);
-
/**
* Build task configuration.
*
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index 87ddb7c0ed5..081e86e8247 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -17,20 +17,16 @@
package org.apache.shardingsphere.data.pipeline.cdc;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
@@ -84,19 +80,6 @@ public final class CDCJobOption implements
TransmissionJobOption {
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
- @Override
- public void extendYamlJobConfiguration(final PipelineContextKey
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
- YamlCDCJobConfiguration config = (YamlCDCJobConfiguration)
yamlJobConfig;
- if (null == yamlJobConfig.getJobId()) {
- config.setJobId(new CDCJobId(contextKey,
config.getSchemaTableNames(), config.isFull(),
config.getSinkConfig().getSinkType()).marshal());
- }
- if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
- PipelineDataSourceConfiguration sourceDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
- config.getDataSourceConfiguration().getParameter());
-
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
- }
- }
-
@Override
public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 80b8c0b9f97..71c58442b9b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -20,19 +20,21 @@ package org.apache.shardingsphere.data.pipeline.cdc.api;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration.YamlSinkConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
@@ -49,10 +51,10 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -113,7 +115,6 @@ public final class CDCJobAPI implements TransmissionJobAPI {
public String create(final StreamDataParameter param, final CDCSinkType
sinkType, final Properties sinkProps) {
PipelineContextKey contextKey = new
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
YamlCDCJobConfiguration yamlJobConfig =
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
- jobOption.extendYamlJobConfiguration(contextKey, yamlJobConfig);
CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
@@ -133,6 +134,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final
StreamDataParameter param, final CDCSinkType sinkType, final Properties
sinkProps, final PipelineContextKey contextKey) {
YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
+ result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(),
param.isFull(), sinkType.name()).marshal());
result.setDatabaseName(param.getDatabaseName());
result.setSchemaTableNames(param.getSchemaTableNames());
result.setFull(param.isFull());
@@ -148,6 +150,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
JobDataNodeLine tableFirstDataNodes = new
JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
.map(each -> new JobDataNodeEntry(each.getKey(),
each.getValue().subList(0, 1))).collect(Collectors.toList()));
result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
+
result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance(
+ result.getDataSourceConfiguration().getType(),
result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType());
return result;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 68888ab63b3..d2554aed14b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -23,9 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDa
import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
@@ -47,7 +45,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consiste
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
@@ -97,14 +94,6 @@ public final class MigrationJobOption implements
TransmissionJobOption {
return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
}
- @Override
- public void extendYamlJobConfiguration(final PipelineContextKey
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
- YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration)
yamlJobConfig;
- if (null == yamlJobConfig.getJobId()) {
- config.setJobId(new MigrationJobId(contextKey,
config.getJobShardingDataNodes()).marshal());
- }
- }
-
@Override
public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 2b0beae44af..84e63ee2345 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOp
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
@@ -92,8 +93,6 @@ import java.util.stream.Collectors;
@Slf4j
public final class MigrationJobAPI implements TransmissionJobAPI {
- private final TransmissionJobOption jobOption;
-
private final PipelineJobManager jobManager;
private final PipelineJobConfigurationManager jobConfigManager;
@@ -101,7 +100,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
private final PipelineDataSourcePersistService dataSourcePersistService;
public MigrationJobAPI() {
- jobOption = new MigrationJobOption();
+ TransmissionJobOption jobOption = new MigrationJobOption();
jobManager = new PipelineJobManager(jobOption);
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
dataSourcePersistService = new PipelineDataSourcePersistService();
@@ -164,7 +163,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- jobOption.extendYamlJobConfiguration(contextKey, result);
+ result.setJobId(new MigrationJobId(contextKey,
result.getJobShardingDataNodes()).marshal());
return result;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index e93a0531eff..e39c149bd21 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -27,15 +27,12 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.test.util.ConfigurationFileUtils;
import java.sql.Connection;
@@ -96,7 +93,6 @@ public final class JobConfigurationBuilder {
result.setSources(sources);
result.setTarget(createYamlPipelineDataSourceConfiguration(new
ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}",
databaseNameSuffix))));
- ((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class,
"MIGRATION").getOption()).extendYamlJobConfiguration(contextKey, result);
return result;
}