This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 28814e9e068 Remove TransmissionJobOption.extendYamlJobConfiguration() 
(#29242)
28814e9e068 is described below

commit 28814e9e0682a77bcb6ad81ab096b917fafd9d64
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 1 00:03:22 2023 +0800

    Remove TransmissionJobOption.extendYamlJobConfiguration() (#29242)
---
 .../core/job/option/TransmissionJobOption.java        | 10 ----------
 .../data/pipeline/cdc/CDCJobOption.java               | 19 +------------------
 .../data/pipeline/cdc/api/CDCJobAPI.java              | 12 ++++++++----
 .../scenario/migration/MigrationJobOption.java        | 11 -----------
 .../scenario/migration/api/MigrationJobAPI.java       |  7 +++----
 .../pipeline/core/util/JobConfigurationBuilder.java   |  4 ----
 6 files changed, 12 insertions(+), 51 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index a9a630ec205..2564468586f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -18,9 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.job.option;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -47,14 +45,6 @@ public interface TransmissionJobOption extends 
PipelineJobOption {
      */
     PipelineJobInfo getJobInfo(String jobId);
     
-    /**
-     * Extend YAML job configuration.
-     *
-     * @param contextKey context key
-     * @param yamlJobConfig YAML job configuration
-     */
-    void extendYamlJobConfiguration(PipelineContextKey contextKey, 
YamlPipelineJobConfiguration yamlJobConfig);
-    
     /**
      * Build task configuration.
      *
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index 87ddb7c0ed5..081e86e8247 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -17,20 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc;
 
-import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
@@ -84,19 +80,6 @@ public final class CDCJobOption implements 
TransmissionJobOption {
         return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
-    @Override
-    public void extendYamlJobConfiguration(final PipelineContextKey 
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
-        YamlCDCJobConfiguration config = (YamlCDCJobConfiguration) 
yamlJobConfig;
-        if (null == yamlJobConfig.getJobId()) {
-            config.setJobId(new CDCJobId(contextKey, 
config.getSchemaTableNames(), config.isFull(), 
config.getSinkConfig().getSinkType()).marshal());
-        }
-        if (Strings.isNullOrEmpty(config.getSourceDatabaseType())) {
-            PipelineDataSourceConfiguration sourceDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getDataSourceConfiguration().getType(),
-                    config.getDataSourceConfiguration().getParameter());
-            
config.setSourceDatabaseType(sourceDataSourceConfig.getDatabaseType().getType());
-        }
-    }
-    
     @Override
     public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 80b8c0b9f97..71c58442b9b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -20,19 +20,21 @@ package org.apache.shardingsphere.data.pipeline.cdc.api;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
+import org.apache.shardingsphere.data.pipeline.cdc.CDCJobId;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJob;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
@@ -49,10 +51,10 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
@@ -113,7 +115,6 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     public String create(final StreamDataParameter param, final CDCSinkType 
sinkType, final Properties sinkProps) {
         PipelineContextKey contextKey = new 
PipelineContextKey(param.getDatabaseName(), InstanceType.PROXY);
         YamlCDCJobConfiguration yamlJobConfig = 
getYamlCDCJobConfiguration(param, sinkType, sinkProps, contextKey);
-        jobOption.extendYamlJobConfiguration(contextKey, yamlJobConfig);
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(yamlJobConfig);
         ShardingSpherePreconditions.checkState(0 != 
jobConfig.getJobShardingCount(), () -> new 
PipelineJobCreationWithInvalidShardingCountException(jobConfig.getJobId()));
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
@@ -133,6 +134,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     
     private YamlCDCJobConfiguration getYamlCDCJobConfiguration(final 
StreamDataParameter param, final CDCSinkType sinkType, final Properties 
sinkProps, final PipelineContextKey contextKey) {
         YamlCDCJobConfiguration result = new YamlCDCJobConfiguration();
+        result.setJobId(new CDCJobId(contextKey, param.getSchemaTableNames(), 
param.isFull(), sinkType.name()).marshal());
         result.setDatabaseName(param.getDatabaseName());
         result.setSchemaTableNames(param.getSchemaTableNames());
         result.setFull(param.isFull());
@@ -148,6 +150,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         JobDataNodeLine tableFirstDataNodes = new 
JobDataNodeLine(param.getDataNodesMap().entrySet().stream()
                 .map(each -> new JobDataNodeEntry(each.getKey(), 
each.getValue().subList(0, 1))).collect(Collectors.toList()));
         result.setTablesFirstDataNodes(tableFirstDataNodes.marshal());
+        
result.setSourceDatabaseType(PipelineDataSourceConfigurationFactory.newInstance(
+                result.getDataSourceConfiguration().getType(), 
result.getDataSourceConfiguration().getParameter()).getDatabaseType().getType());
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 68888ab63b3..d2554aed14b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -23,9 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDa
 import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
@@ -47,7 +45,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consiste
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
@@ -97,14 +94,6 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
     }
     
-    @Override
-    public void extendYamlJobConfiguration(final PipelineContextKey 
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
-        YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration) 
yamlJobConfig;
-        if (null == yamlJobConfig.getJobId()) {
-            config.setJobId(new MigrationJobId(contextKey, 
config.getJobShardingDataNodes()).marshal());
-        }
-    }
-    
     @Override
     public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 2b0beae44af..84e63ee2345 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -45,6 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOp
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
@@ -92,8 +93,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class MigrationJobAPI implements TransmissionJobAPI {
     
-    private final TransmissionJobOption jobOption;
-    
     private final PipelineJobManager jobManager;
     
     private final PipelineJobConfigurationManager jobConfigManager;
@@ -101,7 +100,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     private final PipelineDataSourcePersistService dataSourcePersistService;
     
     public MigrationJobAPI() {
-        jobOption = new MigrationJobOption();
+        TransmissionJobOption jobOption = new MigrationJobOption();
         jobManager = new PipelineJobManager(jobOption);
         jobConfigManager = new PipelineJobConfigurationManager(jobOption);
         dataSourcePersistService = new PipelineDataSourcePersistService();
@@ -164,7 +163,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
         result.setTablesFirstDataNodes(new 
JobDataNodeLine(tablesFirstDataNodes).marshal());
         
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
-        jobOption.extendYamlJobConfiguration(contextKey, result);
+        result.setJobId(new MigrationJobId(contextKey, 
result.getJobShardingDataNodes()).marshal());
         return result;
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index e93a0531eff..e39c149bd21 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -27,15 +27,12 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.test.util.ConfigurationFileUtils;
 
 import java.sql.Connection;
@@ -96,7 +93,6 @@ public final class JobConfigurationBuilder {
         result.setSources(sources);
         result.setTarget(createYamlPipelineDataSourceConfiguration(new 
ShardingSpherePipelineDataSourceConfiguration(
                 
ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}",
 databaseNameSuffix))));
-        ((TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, 
"MIGRATION").getOption()).extendYamlJobConfiguration(contextKey, result);
         return result;
     }
     

Reply via email to