This is an automated email from the ASF dual-hosted git repository.
machen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new c4d1fd2aa78 Impl create/alter/show ProcessConfiguration in pipeline
job API (#20317)
c4d1fd2aa78 is described below
commit c4d1fd2aa78e6654653c32aaded1d8a9752d87a4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 20 16:44:44 2022 +0800
Impl create/alter/show ProcessConfiguration in pipeline job API (#20317)
* Extract PipelineMetaDataPersistService
* Unify addMigrationSourceResources parameter name
* Unify meta data dataSources naming
* Add PipelineProcessConfigurationPersistService
* Impl create/alter/show ProcessConfiguration in pipeline job API
---
.../pipeline/YamlPipelineProcessConfiguration.java | 22 ++++++++
.../pipeline/YamlPipelineReadConfiguration.java | 23 ++++++++
.../pipeline/YamlPipelineWriteConfiguration.java | 20 +++++++
.../data/pipeline/api/MigrationJobPublicAPI.java | 8 +--
.../data/pipeline/api/PipelineJobPublicAPI.java | 22 ++++++++
.../pipeline/core/api/GovernanceRepositoryAPI.java | 28 +++++++--
...PI.java => PipelineMetaDataPersistService.java} | 25 ++++----
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 45 ++++++++++++++-
.../core/api/impl/GovernanceRepositoryAPIImpl.java | 16 +++++-
....java => PipelineDataSourcePersistService.java} | 18 +++---
...PipelineProcessConfigurationPersistService.java | 50 ++++++++++++++++
.../context/AbstractPipelineProcessContext.java | 33 +----------
.../core/exception/PipelineMetaDataException.java | 23 +++-----
.../core/metadata/node/PipelineMetaDataNode.java | 6 +-
.../util/PipelineProcessConfigurationUtils.java | 63 +++++++++++++++++++++
.../scenario/migration/MigrationJobAPIImpl.java | 32 ++++++-----
.../core/fixture/MigrationJobAPIFixture.java | 15 ++++-
.../core/api/impl/MigrationJobAPIImplTest.java | 5 +-
...lineProcessConfigurationPersistServiceTest.java | 66 ++++++++++++++++++++++
19 files changed, 417 insertions(+), 103 deletions(-)
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
index 3cd18b1864e..07e1d1f20ca 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -36,4 +36,26 @@ public final class YamlPipelineProcessConfiguration
implements YamlConfiguration
private YamlPipelineWriteConfiguration write;
private YamlAlgorithmConfiguration streamChannel;
+
+ /**
+ * Copy non-null fields from another.
+ *
+ * @param another another configuration
+ */
+ // TODO add unit test
+ public void copyNonNullFields(final YamlPipelineProcessConfiguration
another) {
+ if (null == read) {
+ read = another.getRead();
+ } else {
+ read.copyNonNullFields(another.getRead());
+ }
+ if (null == write) {
+ write = another.getWrite();
+ } else {
+ write.copyNonNullFields(another.getWrite());
+ }
+ if (null == streamChannel) {
+ streamChannel = another.getStreamChannel();
+ }
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
index 4e13aabca34..7f392856597 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineReadConfiguration.java
@@ -64,4 +64,27 @@ public final class YamlPipelineReadConfiguration implements
YamlConfiguration {
shardingSize = DEFAULT_SHARDING_SIZE;
}
}
+
+ /**
+ * Copy non-null fields from another.
+ *
+ * @param another another configuration
+ */
+ public void copyNonNullFields(final YamlPipelineReadConfiguration another)
{
+ if (null == another) {
+ return;
+ }
+ if (null != another.getWorkerThread()) {
+ workerThread = another.getWorkerThread();
+ }
+ if (null != another.getBatchSize()) {
+ batchSize = another.getBatchSize();
+ }
+ if (null != another.getShardingSize()) {
+ shardingSize = another.getShardingSize();
+ }
+ if (null != another.getRateLimiter()) {
+ rateLimiter = another.getRateLimiter();
+ }
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
index fb5460ae6b6..ef0d488826c 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineWriteConfiguration.java
@@ -57,4 +57,24 @@ public final class YamlPipelineWriteConfiguration implements
YamlConfiguration {
batchSize = DEFAULT_BATCH_SIZE;
}
}
+
+ /**
+ * Copy non-null fields from another.
+ *
+ * @param another another configuration
+ */
+ public void copyNonNullFields(final YamlPipelineWriteConfiguration
another) {
+ if (null == another) {
+ return;
+ }
+ if (null != another.getWorkerThread()) {
+ workerThread = another.workerThread;
+ }
+ if (null != another.getBatchSize()) {
+ batchSize = another.getBatchSize();
+ }
+ if (null != another.getRateLimiter()) {
+ rateLimiter = another.getRateLimiter();
+ }
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index f370af21eba..a31bd8aa037 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -114,14 +114,14 @@ public interface MigrationJobPublicAPI extends
PipelineJobPublicAPI, RequiredSPI
void reset(String jobId);
/**
- * Update migration source resource.
+ * Add migration source resources.
*
- * @param sourcePropertiesMap source properties map
+ * @param dataSourcePropsMap data source properties map
*/
- void addMigrationSourceResources(Map<String, DataSourceProperties>
sourcePropertiesMap);
+ void addMigrationSourceResources(Map<String, DataSourceProperties>
dataSourcePropsMap);
/**
- * Drop migration source resource.
+ * Drop migration source resources.
*
* @param resourceNames resource names
*/
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index 13c9aa5de7e..1d7903a9a24 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import java.util.List;
@@ -27,6 +28,27 @@ import java.util.List;
*/
public interface PipelineJobPublicAPI extends TypedSPI {
+ /**
+ * Create process configuration.
+ *
+ * @param processConfig process configuration
+ */
+ void createProcessConfiguration(PipelineProcessConfiguration
processConfig);
+
+ /**
+ * Alter process configuration.
+ *
+ * @param processConfig process configuration
+ */
+ void alterProcessConfiguration(PipelineProcessConfiguration processConfig);
+
+ /**
+ * Show process configuration.
+ *
+ * @return process configuration
+ */
+ PipelineProcessConfiguration showProcessConfiguration();
+
/**
* Start disabled job.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
index a1181ada319..d745f71b1e3 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/GovernanceRepositoryAPI.java
@@ -110,18 +110,34 @@ public interface GovernanceRepositoryAPI {
List<Integer> getShardingItems(String jobId);
/**
- * Get migration source data source.
+ * Get meta data data sources.
*
* @param jobType job type
- * @return migration source data source
+ * @return data source properties
*/
- String getMetaDataDataSource(JobType jobType);
+ String getMetaDataDataSources(JobType jobType);
/**
- * Persist meta data data source.
+ * Persist meta data data sources.
*
* @param jobType job type
- * @param metaDataDataSource meta data data source
+ * @param metaDataDataSources data source properties
*/
- void persistMetaDataDataSource(JobType jobType, String metaDataDataSource);
+ void persistMetaDataDataSources(JobType jobType, String
metaDataDataSources);
+
+ /**
+ * Get meta data process configuration.
+ *
+ * @param jobType job type, nullable
+ * @return process configuration YAML text
+ */
+ String getMetaDataProcessConfiguration(JobType jobType);
+
+ /**
+ * Persist meta data process configuration.
+ *
+ * @param jobType job type, nullable
+ * @param processConfigYamlText process configuration YAML text
+ */
+ void persistMetaDataProcessConfiguration(JobType jobType, String
processConfigYamlText);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
similarity index 64%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
index 81d95798cc1..517fbe08fed 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineResourceAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineMetaDataPersistService.java
@@ -18,28 +18,27 @@
package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-
-import java.util.Map;
/**
- * Pipeline resource API.
+ * Pipeline meta data persist service.
+ *
+ * @param <T> type of configuration
*/
-public interface PipelineResourceAPI {
+public interface PipelineMetaDataPersistService<T> {
/**
- * Get meta data data source.
+ * Load meta data.
*
- * @param jobType job type
- * @return meta data data source
+ * @param jobType job type, nullable
+ * @return configurations
*/
- Map<String, DataSourceProperties> getMetaDataDataSource(JobType jobType);
+ T load(JobType jobType);
/**
- * Persist meta data data source.
+ * Persist meta data.
*
- * @param jobType job type
- * @param dataSource data source
+ * @param jobType job type, nullable
+ * @param configs configurations
*/
- void persistMetaDataDataSource(JobType jobType, Map<String,
DataSourceProperties> dataSource);
+ void persist(JobType jobType, T configs);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index f2f6a3cb583..d1a3b38da4e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -18,12 +18,10 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
import com.google.common.base.Preconditions;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
@@ -32,20 +30,28 @@ import
org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineMetaDataException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Abstract pipeline job API impl.
@@ -55,8 +61,41 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private static final YamlPipelineProcessConfigurationSwapper
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+
+ private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+
private final PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance();
+ protected abstract JobType getJobType();
+
+ @Override
+ public void createProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
+ if (null != existingProcessConfig) {
+ throw new PipelineMetaDataException("Process configuration already
exists");
+ }
+ processConfigPersistService.persist(getJobType(), processConfig);
+ }
+
+ @Override
+ public void alterProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ PipelineProcessConfiguration existingProcessConfig =
processConfigPersistService.load(getJobType());
+ if (null == existingProcessConfig) {
+ throw new PipelineMetaDataException("Process configuration does
not exists");
+ }
+ YamlPipelineProcessConfiguration targetYamlProcessConfig =
PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(existingProcessConfig);
+
targetYamlProcessConfig.copyNonNullFields(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+ processConfigPersistService.persist(getJobType(),
PROCESS_CONFIG_SWAPPER.swapToObject(targetYamlProcessConfig));
+ }
+
+ @Override
+ public PipelineProcessConfiguration showProcessConfiguration() {
+ PipelineProcessConfiguration result =
processConfigPersistService.load(getJobType());
+ result =
PipelineProcessConfigurationUtils.convertWithDefaultValue(result);
+ return result;
+ }
+
@Override
public final String marshalJobId(final PipelineJobId pipelineJobId) {
return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) +
marshalJobIdLeftPart(pipelineJobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
index 09ed414e51a..a6dbbd1cc5b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/GovernanceRepositoryAPIImpl.java
@@ -95,12 +95,22 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public String getMetaDataDataSource(final JobType jobType) {
+ public String getMetaDataDataSources(final JobType jobType) {
return
repository.get(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
}
@Override
- public void persistMetaDataDataSource(final JobType jobType, final String
metaDataDataSource) {
-
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType),
metaDataDataSource);
+ public void persistMetaDataDataSources(final JobType jobType, final String
metaDataDataSources) {
+
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType),
metaDataDataSources);
+ }
+
+ @Override
+ public String getMetaDataProcessConfiguration(final JobType jobType) {
+ return
repository.get(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
+ }
+
+ @Override
+ public void persistMetaDataProcessConfiguration(final JobType jobType,
final String processConfigYamlText) {
+
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType),
processConfigYamlText);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
similarity index 81%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
index 00bc24ecbef..4ae5a9a8c44 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineResourceAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineDataSourcePersistService.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import org.apache.commons.lang3.StringUtils;
import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
@@ -31,16 +31,16 @@ import java.util.Map;
import java.util.Map.Entry;
/**
- * Pipeline resource API implementation.
+ * Pipeline data source persist service.
*/
-public final class PipelineResourceAPIImpl implements PipelineResourceAPI {
+public final class PipelineDataSourcePersistService implements
PipelineMetaDataPersistService<Map<String, DataSourceProperties>> {
private static final YamlDataSourceConfigurationSwapper
DATA_SOURCE_CONFIG_SWAPPER = new YamlDataSourceConfigurationSwapper();
@Override
@SuppressWarnings("unchecked")
- public Map<String, DataSourceProperties> getMetaDataDataSource(final
JobType jobType) {
- String dataSourcesProperties =
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataDataSource(jobType);
+ public Map<String, DataSourceProperties> load(final JobType jobType) {
+ String dataSourcesProperties =
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataDataSources(jobType);
if (StringUtils.isBlank(dataSourcesProperties)) {
return Collections.emptyMap();
}
@@ -51,11 +51,11 @@ public final class PipelineResourceAPIImpl implements
PipelineResourceAPI {
}
@Override
- public void persistMetaDataDataSource(final JobType jobType, final
Map<String, DataSourceProperties> dataSourceConfigs) {
- Map<String, Map<String, Object>> dataSourceMap = new
LinkedHashMap<>(dataSourceConfigs.size());
- for (Entry<String, DataSourceProperties> entry :
dataSourceConfigs.entrySet()) {
+ public void persist(final JobType jobType, final Map<String,
DataSourceProperties> dataSourcePropsMap) {
+ Map<String, Map<String, Object>> dataSourceMap = new
LinkedHashMap<>(dataSourcePropsMap.size());
+ for (Entry<String, DataSourceProperties> entry :
dataSourcePropsMap.entrySet()) {
dataSourceMap.put(entry.getKey(),
DATA_SOURCE_CONFIG_SWAPPER.swapToMap(entry.getValue()));
}
-
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSource(jobType,
YamlEngine.marshal(dataSourceMap));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataDataSources(jobType,
YamlEngine.marshal(dataSourceMap));
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
new file mode 100644
index 00000000000..e57e08c89dd
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.PipelineMetaDataPersistService;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
+
+/**
+ * Pipeline process configuration persist service.
+ */
+public final class PipelineProcessConfigurationPersistService implements
PipelineMetaDataPersistService<PipelineProcessConfiguration> {
+
+ private static final YamlPipelineProcessConfigurationSwapper
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+
+ @Override
+ public PipelineProcessConfiguration load(final JobType jobType) {
+ String yamlText =
PipelineAPIFactory.getGovernanceRepositoryAPI().getMetaDataProcessConfiguration(jobType);
+ if (StringUtils.isBlank(yamlText)) {
+ return null;
+ }
+ return
PROCESS_CONFIG_SWAPPER.swapToObject(YamlEngine.unmarshal(yamlText,
YamlPipelineProcessConfiguration.class, true));
+ }
+
+ @Override
+ public void persist(final JobType jobType, final
PipelineProcessConfiguration processConfig) {
+ String yamlText =
YamlEngine.marshal(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(processConfig));
+
PipelineAPIFactory.getGovernanceRepositoryAPI().persistMetaDataProcessConfiguration(jobType,
yamlText);
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
index 774ca9e6145..7079eeb5c9d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
@@ -24,22 +24,15 @@ import
org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineReadConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineWriteConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
-
-import java.util.Properties;
/**
* Abstract pipeline process context.
@@ -48,8 +41,6 @@ import java.util.Properties;
@Slf4j
public abstract class AbstractPipelineProcessContext implements
PipelineProcessContext {
- private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new
YamlPipelineProcessConfigurationSwapper();
-
private final PipelineProcessConfiguration pipelineProcessConfig;
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
@@ -65,7 +56,7 @@ public abstract class AbstractPipelineProcessContext
implements PipelineProcessC
private final LazyInitializer<ExecuteEngine>
importerExecuteEngineLazyInitializer;
public AbstractPipelineProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
- PipelineProcessConfiguration processConfig =
convertProcessConfig(originalProcessConfig);
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
this.pipelineProcessConfig = processConfig;
PipelineReadConfiguration readConfig = processConfig.getRead();
AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
@@ -98,24 +89,6 @@ public abstract class AbstractPipelineProcessContext
implements PipelineProcessC
};
}
- private PipelineProcessConfiguration convertProcessConfig(final
PipelineProcessConfiguration originalProcessConfig) {
- YamlPipelineProcessConfiguration yamlActionConfig =
SWAPPER.swapToYamlConfiguration(originalProcessConfig);
- if (null == yamlActionConfig.getRead()) {
-
yamlActionConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
- } else {
- yamlActionConfig.getRead().fillInNullFieldsWithDefaultValue();
- }
- if (null == yamlActionConfig.getWrite()) {
-
yamlActionConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
- } else {
- yamlActionConfig.getWrite().fillInNullFieldsWithDefaultValue();
- }
- if (null == yamlActionConfig.getStreamChannel()) {
- yamlActionConfig.setStreamChannel(new
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new
Properties()));
- }
- return SWAPPER.swapToObject(yamlActionConfig);
- }
-
/**
* Get inventory dumper execute engine.
*
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineMetaDataException.java
similarity index 57%
copy from
shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineMetaDataException.java
index 3cd18b1864e..88769d0b70b 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/PipelineMetaDataException.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+package org.apache.shardingsphere.data.pipeline.core.exception;
/**
- * YAML pipeline process configuration.
+ * Pipeline meta data exception.
*/
-@Getter
-@Setter
-@ToString
-public final class YamlPipelineProcessConfiguration implements
YamlConfiguration {
-
- private YamlPipelineReadConfiguration read;
+public final class PipelineMetaDataException extends RuntimeException {
- private YamlPipelineWriteConfiguration write;
+ private static final long serialVersionUID = 1L;
- private YamlAlgorithmConfiguration streamChannel;
+ public PipelineMetaDataException(final String message) {
+ super(message);
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index 9bd4767593e..e4e36bd727d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -47,7 +47,11 @@ public final class PipelineMetaDataNode {
}
private static String getMetaDataRootPath(final JobType jobType) {
- return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobType.getLowercaseTypeName(), "metadata");
+ if (null != jobType) {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobType.getLowercaseTypeName(), "metadata");
+ } else {
+ return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
"metadata");
+ }
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
new file mode 100644
index 00000000000..b267cfea044
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineProcessConfigurationUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
+
+import java.util.Properties;
+
+/**
+ * Pipeline process configuration utils.
+ */
+public final class PipelineProcessConfigurationUtils {
+
+ private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new
YamlPipelineProcessConfigurationSwapper();
+
+ /**
+ * Convert with default value.
+ *
+ * @param originalConfig original process configuration, nullable
+ * @return process configuration
+ */
+ public static PipelineProcessConfiguration convertWithDefaultValue(final
PipelineProcessConfiguration originalConfig) {
+ if (null != originalConfig && null != originalConfig.getRead() && null
!= originalConfig.getWrite() && null != originalConfig.getStreamChannel()) {
+ return originalConfig;
+ }
+ YamlPipelineProcessConfiguration yamlConfig = null != originalConfig ?
SWAPPER.swapToYamlConfiguration(originalConfig) : new
YamlPipelineProcessConfiguration();
+ if (null == yamlConfig.getRead()) {
+
yamlConfig.setRead(YamlPipelineReadConfiguration.buildWithDefaultValue());
+ } else {
+ yamlConfig.getRead().fillInNullFieldsWithDefaultValue();
+ }
+ if (null == yamlConfig.getWrite()) {
+
yamlConfig.setWrite(YamlPipelineWriteConfiguration.buildWithDefaultValue());
+ } else {
+ yamlConfig.getWrite().fillInNullFieldsWithDefaultValue();
+ }
+ if (null == yamlConfig.getStreamChannel()) {
+ yamlConfig.setStreamChannel(new
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new
Properties()));
+ }
+ return SWAPPER.swapToObject(yamlConfig);
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 06ce962775c..aef8bb6501a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -40,10 +40,9 @@ import
org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobItemAPI;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.AbstractPipelineJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.InventoryIncrementalJobItemAPIImpl;
-import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineResourceAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.core.api.impl.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -89,7 +88,12 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
private final PipelineJobItemAPI jobItemAPI = new
InventoryIncrementalJobItemAPIImpl();
- private final PipelineResourceAPI pipelineResourceAPI = new
PipelineResourceAPIImpl();
+ private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
+
+ @Override
+ protected JobType getJobType() {
+ return JobType.MIGRATION;
+ }
@Override
protected String marshalJobIdLeftPart(final PipelineJobId pipelineJobId) {
@@ -119,7 +123,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
private String generateJobId(final YamlMigrationJobConfiguration config) {
MigrationJobId jobId = new MigrationJobId();
- jobId.setTypeCode(JobType.MIGRATION.getTypeCode());
+ jobId.setTypeCode(getJobType().getTypeCode());
jobId.setFormatVersion(MigrationJobId.CURRENT_VERSION);
jobId.setCurrentMetadataVersion(config.getActiveVersion());
jobId.setNewMetadataVersion(config.getNewVersion());
@@ -402,11 +406,11 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
}
@Override
- public void addMigrationSourceResources(final Map<String,
DataSourceProperties> dataSourceProperties) {
- log.info("Add migration source resources {}",
dataSourceProperties.keySet());
- Map<String, DataSourceProperties> existDataSources =
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
- Collection<String> duplicateDataSourceNames = new
HashSet<>(dataSourceProperties.size(), 1);
- for (Entry<String, DataSourceProperties> entry :
dataSourceProperties.entrySet()) {
+ public void addMigrationSourceResources(final Map<String,
DataSourceProperties> dataSourcePropsMap) {
+ log.info("Add migration source resources {}",
dataSourcePropsMap.keySet());
+ Map<String, DataSourceProperties> existDataSources =
dataSourcePersistService.load(getJobType());
+ Collection<String> duplicateDataSourceNames = new
HashSet<>(dataSourcePropsMap.size(), 1);
+ for (Entry<String, DataSourceProperties> entry :
dataSourcePropsMap.entrySet()) {
if (existDataSources.containsKey(entry.getKey())) {
duplicateDataSourceNames.add(entry.getKey());
}
@@ -415,13 +419,13 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
throw new
AddMigrationSourceResourceException(String.format("Duplicate resource names
%s.", duplicateDataSourceNames));
}
Map<String, DataSourceProperties> result = new
LinkedHashMap<>(existDataSources);
- result.putAll(dataSourceProperties);
- pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION,
result);
+ result.putAll(dataSourcePropsMap);
+ dataSourcePersistService.persist(getJobType(), result);
}
@Override
public void dropMigrationSourceResources(final Collection<String>
resourceNames) {
- Map<String, DataSourceProperties> metaDataDataSource =
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+ Map<String, DataSourceProperties> metaDataDataSource =
dataSourcePersistService.load(getJobType());
List<String> noExistResources = resourceNames.stream().filter(each ->
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
if (!noExistResources.isEmpty()) {
throw new
DropMigrationSourceResourceException(String.format("Resource names %s not
exist.", resourceNames));
@@ -429,11 +433,11 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
for (String each : resourceNames) {
metaDataDataSource.remove(each);
}
- pipelineResourceAPI.persistMetaDataDataSource(JobType.MIGRATION,
metaDataDataSource);
+ dataSourcePersistService.persist(getJobType(), metaDataDataSource);
}
@Override
public String getType() {
- return JobType.MIGRATION.getTypeName();
+ return getJobType().getTypeName();
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
index 81e59f59ed7..424ee393d5b 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
@@ -50,6 +50,19 @@ public final class MigrationJobAPIFixture implements
MigrationJobAPI {
public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
}
+ @Override
+ public void createProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ }
+
+ @Override
+ public void alterProcessConfiguration(final PipelineProcessConfiguration
processConfig) {
+ }
+
+ @Override
+ public PipelineProcessConfiguration showProcessConfiguration() {
+ return null;
+ }
+
@Override
public void startDisabledJob(final String jobId) {
}
@@ -146,7 +159,7 @@ public final class MigrationJobAPIFixture implements
MigrationJobAPI {
}
@Override
- public void addMigrationSourceResources(final Map<String,
DataSourceProperties> sourcePropertiesMap) {
+ public void addMigrationSourceResources(final Map<String,
DataSourceProperties> dataSourcePropsMap) {
}
@Override
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index f56f0ce16bc..2b292de129b 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -31,7 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.api.PipelineResourceAPI;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -278,8 +277,8 @@ public final class MigrationJobAPIImplTest {
Map<String, DataSourceProperties> expect = new LinkedHashMap<>(1, 1);
expect.put("ds_0", new
DataSourceProperties("com.zaxxer.hikari.HikariDataSource", props));
jobAPI.addMigrationSourceResources(expect);
- PipelineResourceAPI pipelineResourceAPI = new
PipelineResourceAPIImpl();
- Map<String, DataSourceProperties> actual =
pipelineResourceAPI.getMetaDataDataSource(JobType.MIGRATION);
+ PipelineDataSourcePersistService persistService = new
PipelineDataSourcePersistService();
+ Map<String, DataSourceProperties> actual =
persistService.load(JobType.MIGRATION);
assertTrue(actual.containsKey("ds_0"));
}
}
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
new file mode 100644
index 00000000000..c959637a971
--- /dev/null
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/PipelineProcessConfigurationPersistServiceTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.api.impl;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineReadConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineWriteConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PipelineProcessConfigurationPersistServiceTest {
+
+ private static final YamlPipelineProcessConfigurationSwapper
PROCESS_CONFIG_SWAPPER = new YamlPipelineProcessConfigurationSwapper();
+
+ @BeforeClass
+ public static void beforeClass() {
+ PipelineContextUtil.mockModeConfigAndContextManager();
+ }
+
+ @Test
+ public void assertLoadAndPersist() {
+ YamlPipelineProcessConfiguration yamlProcessConfig = new
YamlPipelineProcessConfiguration();
+ YamlPipelineReadConfiguration yamlReadConfig =
YamlPipelineReadConfiguration.buildWithDefaultValue();
+ yamlReadConfig.fillInNullFieldsWithDefaultValue();
+ yamlReadConfig.setShardingSize(10);
+ yamlProcessConfig.setRead(yamlReadConfig);
+ YamlPipelineWriteConfiguration yamlWriteConfig =
YamlPipelineWriteConfiguration.buildWithDefaultValue();
+ yamlProcessConfig.setWrite(yamlWriteConfig);
+ YamlAlgorithmConfiguration yamlStreamChannel = new
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new Properties());
+ yamlProcessConfig.setStreamChannel(yamlStreamChannel);
+ String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
+ PipelineProcessConfiguration processConfig =
PROCESS_CONFIG_SWAPPER.swapToObject(yamlProcessConfig);
+ PipelineProcessConfigurationPersistService persistService = new
PipelineProcessConfigurationPersistService();
+ JobType jobType = JobType.MIGRATION;
+ persistService.persist(jobType, processConfig);
+ String actualYamlText =
YamlEngine.marshal(PROCESS_CONFIG_SWAPPER.swapToYamlConfiguration(persistService.load(jobType)));
+ assertThat(actualYamlText, is(expectedYamlText));
+ }
+}