This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 13901915820 Refactor RuleAlteredContext for common usage (#19903)
13901915820 is described below
commit 139019158209d8f02eadfd52979b996933dccb7e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 5 18:45:30 2022 +0800
Refactor RuleAlteredContext for common usage (#19903)
* Add PipelineProcessConfiguration and yaml swapper
* Add PipelineProcessContext
* Refactor RuleAlteredJobConfigurationPreparer
* Add test
---
...hardingRuleAlteredJobConfigurationPreparer.java | 12 +--
.../pipeline/PipelineProcessConfiguration.java | 38 +++++++
.../pipeline/YamlPipelineProcessConfiguration.java | 39 +++++++
.../YamlPipelineProcessConfigurationSwapper.java | 58 +++++++++++
...amlPipelineProcessConfigurationSwapperTest.java | 68 ++++++++++++
.../RuleAlteredJobConfigurationPreparer.java | 6 +-
...RuleAlteredJobConfigurationPreparerFixture.java | 4 +-
.../context/AbstractPipelineProcessContext.java} | 115 +++++++++++++--------
.../core/context/PipelineProcessContext.java | 24 +++++
.../scenario/rulealtered/RuleAlteredContext.java | 75 ++------------
.../rulealtered/RuleAlteredJobContext.java | 2 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 7 +-
.../rulealtered/prepare/InventoryTaskSplitter.java | 4 +-
.../rulealtered/RuleAlteredJobWorkerTest.java | 2 +-
14 files changed, 322 insertions(+), 132 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
index 1e6b5bca95f..981150126c2 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingRuleAlteredJobConfigurationPreparer.java
@@ -35,12 +35,12 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
import
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
@@ -153,7 +153,7 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
// TODO use jobConfig as parameter, jobShardingItem
@Override
- public TaskConfiguration createTaskConfiguration(final
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+ public TaskConfiguration createTaskConfiguration(final
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
ShardingSpherePipelineDataSourceConfiguration sourceConfig =
getSourceConfiguration(jobConfig);
ShardingRuleConfiguration sourceRuleConfig =
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(sourceConfig.getRootConfig().getRules());
Map<String, DataSourceProperties> dataSourcePropsMap = new
YamlDataSourceConfigurationSwapper().getDataSourcePropertiesMap(sourceConfig.getRootConfig());
@@ -171,7 +171,7 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
Optional<ShardingRuleConfiguration> targetRuleConfig =
getTargetRuleConfiguration(jobConfig);
Set<LogicTableName> reShardNeededTables =
jobConfig.splitLogicTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
Map<LogicTableName, Set<String>> shardingColumnsMap =
getShardingColumnsMap(targetRuleConfig.orElse(sourceRuleConfig),
reShardNeededTables);
- ImporterConfiguration importerConfig =
createImporterConfiguration(jobConfig, onRuleAlteredActionConfig,
shardingColumnsMap, tableNameSchemaNameMapping);
+ ImporterConfiguration importerConfig =
createImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, tableNameSchemaNameMapping);
TaskConfiguration result = new TaskConfiguration(dumperConfig,
importerConfig);
log.info("createTaskConfiguration, dataSourceName={}, result={}",
dataSourceName, result);
return result;
@@ -237,10 +237,10 @@ public final class
ShardingRuleAlteredJobConfigurationPreparer implements RuleAl
return result;
}
- private static ImporterConfiguration createImporterConfiguration(final
RuleAlteredJobConfiguration jobConfig, final OnRuleAlteredActionConfiguration
onRuleAlteredActionConfig,
+ private static ImporterConfiguration createImporterConfiguration(final
RuleAlteredJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getTarget().getType(),
jobConfig.getTarget().getParameter());
- int batchSize = onRuleAlteredActionConfig.getOutput().getBatchSize();
+ int batchSize = pipelineProcessConfig.getOutput().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
return new ImporterConfiguration(dataSourceConfig,
unmodifiable(shardingColumnsMap), tableNameSchemaNameMapping, batchSize,
retryTimes, concurrency);
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
new file mode 100644
index 00000000000..928960d3026
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/config/rule/data/pipeline/PipelineProcessConfiguration.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.config.rule.data.pipeline;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+
+/**
+ * Pipeline process configuration.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class PipelineProcessConfiguration {
+
+ private final PipelineInputConfiguration input;
+
+ private final PipelineOutputConfiguration output;
+
+ private final AlgorithmConfiguration streamChannel;
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
new file mode 100644
index 00000000000..71bb47943e6
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/pojo/data/pipeline/YamlPipelineProcessConfiguration.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+
+/**
+ * YAML pipeline process configuration.
+ */
+@Getter
+@Setter
+@ToString
+public final class YamlPipelineProcessConfiguration implements
YamlConfiguration {
+
+ private YamlPipelineInputConfiguration input;
+
+ private YamlPipelineOutputConfiguration output;
+
+ private YamlAlgorithmConfiguration streamChannel;
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
new file mode 100644
index 00000000000..cd6a37812d5
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.YamlConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.algorithm.YamlAlgorithmConfigurationSwapper;
+
+/**
+ * YAML pipeline process configuration swapper.
+ */
+public final class YamlPipelineProcessConfigurationSwapper implements
YamlConfigurationSwapper<YamlPipelineProcessConfiguration,
PipelineProcessConfiguration> {
+
+ private static final YamlAlgorithmConfigurationSwapper
ALGORITHM_CONFIG_SWAPPER = new YamlAlgorithmConfigurationSwapper();
+
+ private static final YamlPipelineInputConfigurationSwapper
INPUT_CONFIG_SWAPPER = new YamlPipelineInputConfigurationSwapper();
+
+ private static final YamlPipelineOutputConfigurationSwapper
OUTPUT_CONFIG_SWAPPER = new YamlPipelineOutputConfigurationSwapper();
+
+ @Override
+ public YamlPipelineProcessConfiguration swapToYamlConfiguration(final
PipelineProcessConfiguration data) {
+ if (null == data) {
+ return null;
+ }
+ YamlPipelineProcessConfiguration result = new
YamlPipelineProcessConfiguration();
+
result.setInput(INPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getInput()));
+
result.setOutput(OUTPUT_CONFIG_SWAPPER.swapToYamlConfiguration(data.getOutput()));
+
result.setStreamChannel(ALGORITHM_CONFIG_SWAPPER.swapToYamlConfiguration(data.getStreamChannel()));
+ return result;
+ }
+
+ @Override
+ public PipelineProcessConfiguration swapToObject(final
YamlPipelineProcessConfiguration yamlConfig) {
+ if (null == yamlConfig) {
+ return null;
+ }
+ return new PipelineProcessConfiguration(
+ INPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getInput()),
+ OUTPUT_CONFIG_SWAPPER.swapToObject(yamlConfig.getOutput()),
+
ALGORITHM_CONFIG_SWAPPER.swapToObject(yamlConfig.getStreamChannel()));
+ }
+}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
new file mode 100644
index 00000000000..8558f0197c1
--- /dev/null
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/yaml/config/swapper/rule/data/pipeline/YamlPipelineProcessConfigurationSwapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline;
+
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import org.junit.Test;
+
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+public final class YamlPipelineProcessConfigurationSwapperTest {
+
+ private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new
YamlPipelineProcessConfigurationSwapper();
+
+ @Test
+ public void assertSwap() {
+ YamlPipelineProcessConfiguration yamlConfig = new
YamlPipelineProcessConfiguration();
+ Properties rateLimiterProps = new Properties();
+ rateLimiterProps.setProperty("batch-size", "1000");
+ rateLimiterProps.setProperty("qps", "50");
+ YamlPipelineInputConfiguration yamlInputConfig =
YamlPipelineInputConfiguration.buildWithDefaultValue();
+ yamlConfig.setInput(yamlInputConfig);
+ yamlInputConfig.setRateLimiter(new YamlAlgorithmConfiguration("INPUT",
rateLimiterProps));
+ YamlPipelineOutputConfiguration yamlOutputConfig =
YamlPipelineOutputConfiguration.buildWithDefaultValue();
+ yamlOutputConfig.setRateLimiter(new
YamlAlgorithmConfiguration("OUTPUT", rateLimiterProps));
+ yamlConfig.setOutput(yamlOutputConfig);
+ Properties streamChannelProps = new Properties();
+ streamChannelProps.setProperty("block-queue-size", "10000");
+ yamlConfig.setStreamChannel(new YamlAlgorithmConfiguration("MEMORY",
streamChannelProps));
+ YamlPipelineProcessConfigurationSwapper
onRuleAlteredActionConfigSwapper = new
YamlPipelineProcessConfigurationSwapper();
+ PipelineProcessConfiguration actualConfig =
onRuleAlteredActionConfigSwapper.swapToObject(yamlConfig);
+ YamlPipelineProcessConfiguration actualYamlConfig =
onRuleAlteredActionConfigSwapper.swapToYamlConfiguration(actualConfig);
+ assertThat(YamlEngine.marshal(actualYamlConfig),
is(YamlEngine.marshal(yamlConfig)));
+ }
+
+ @Test
+ public void assertYamlConfigNull() {
+ assertNull(SWAPPER.swapToYamlConfiguration(null));
+ }
+
+ @Test
+ public void assertConfigNull() {
+ assertNull(SWAPPER.swapToObject(null));
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
index b400ad8fac8..809fa2fa6e7 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/rulealtered/RuleAlteredJobConfigurationPreparer.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.spi.rulealtered;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import org.apache.shardingsphere.spi.type.required.RequiredSPI;
/**
@@ -40,8 +40,8 @@ public interface RuleAlteredJobConfigurationPreparer extends
RequiredSPI {
*
* @param jobConfig job configuration
* @param jobShardingItem job sharding item
- * @param onRuleAlteredActionConfig action configuration
+ * @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
- TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration
jobConfig, int jobShardingItem, OnRuleAlteredActionConfiguration
onRuleAlteredActionConfig);
+ TaskConfiguration createTaskConfiguration(RuleAlteredJobConfiguration
jobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
index 8649d74ab42..e454bd44c4a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/spi/fixture/RuleAlteredJobConfigurationPreparerFixture.java
@@ -21,7 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAltere
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparer;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
public final class RuleAlteredJobConfigurationPreparerFixture implements
RuleAlteredJobConfigurationPreparer {
@@ -30,7 +30,7 @@ public final class RuleAlteredJobConfigurationPreparerFixture
implements RuleAlt
}
@Override
- public TaskConfiguration createTaskConfiguration(final
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
+ public TaskConfiguration createTaskConfiguration(final
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
return null;
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
similarity index 50%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
index b6e0c69070d..aafe6962271 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/AbstractPipelineProcessContext.java
@@ -15,17 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
+package org.apache.shardingsphere.data.pipeline.core.context;
import lombok.Getter;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
-import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
-import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -33,32 +31,25 @@ import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineProcessConfiguration;
+import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.data.pipeline.YamlPipelineProcessConfigurationSwapper;
import java.util.Properties;
/**
- * Rule altered context.
+ * Abstract pipeline process context.
*/
@Getter
@Slf4j
-// TODO extract Pipeline Context
-public final class RuleAlteredContext {
+public abstract class AbstractPipelineProcessContext {
- private static final String INVENTORY_THREAD_PREFIX = "Inventory-";
+ private static final YamlPipelineProcessConfigurationSwapper SWAPPER = new
YamlPipelineProcessConfigurationSwapper();
- private static final String INCREMENTAL_THREAD_PREFIX = "Incremental-";
-
- private static final String IMPORTER_THREAD_PREFIX = "Importer-";
-
- private static final YamlOnRuleAlteredActionConfigurationSwapper SWAPPER =
new YamlOnRuleAlteredActionConfigurationSwapper();
-
- private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
+ private final PipelineProcessConfiguration pipelineProcessConfig;
private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
@@ -66,41 +57,45 @@ public final class RuleAlteredContext {
private final PipelineChannelCreator pipelineChannelCreator;
- private final
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter>
completionDetectAlgorithm;
+ private final LazyInitializer<ExecuteEngine>
inventoryDumperExecuteEngineLazyInitializer;
- private final DataConsistencyCalculateAlgorithm
dataConsistencyCalculateAlgorithm;
+ private final LazyInitializer<ExecuteEngine>
incrementalDumperExecuteEngineLazyInitializer;
- private final ExecuteEngine inventoryDumperExecuteEngine;
+ private final LazyInitializer<ExecuteEngine>
importerExecuteEngineLazyInitializer;
- private final ExecuteEngine incrementalDumperExecuteEngine;
-
- private final ExecuteEngine importerExecuteEngine;
-
- @SuppressWarnings("unchecked")
- public RuleAlteredContext(final String jobId, final
OnRuleAlteredActionConfiguration actionConfig) {
- OnRuleAlteredActionConfiguration onRuleAlteredActionConfig =
convertActionConfig(actionConfig);
- this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
- PipelineInputConfiguration inputConfig =
onRuleAlteredActionConfig.getInput();
+ public AbstractPipelineProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
+ PipelineProcessConfiguration processConfig =
convertProcessConfig(originalProcessConfig);
+ this.pipelineProcessConfig = processConfig;
+ PipelineInputConfiguration inputConfig = processConfig.getInput();
AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
inputRateLimitAlgorithm = null != inputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
- PipelineOutputConfiguration outputConfig =
onRuleAlteredActionConfig.getOutput();
+ PipelineOutputConfiguration outputConfig = processConfig.getOutput();
AlgorithmConfiguration outputRateLimiter =
outputConfig.getRateLimiter();
outputRateLimitAlgorithm = null != outputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
- AlgorithmConfiguration streamChannel =
onRuleAlteredActionConfig.getStreamChannel();
+ AlgorithmConfiguration streamChannel =
processConfig.getStreamChannel();
pipelineChannelCreator =
PipelineChannelCreatorFactory.newInstance(streamChannel);
- AlgorithmConfiguration completionDetector =
onRuleAlteredActionConfig.getCompletionDetector();
- completionDetectAlgorithm = null != completionDetector ?
JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
- AlgorithmConfiguration dataConsistencyCheckerConfig =
onRuleAlteredActionConfig.getDataConsistencyCalculator();
- dataConsistencyCalculateAlgorithm = null !=
dataConsistencyCheckerConfig
- ?
DataConsistencyCalculateAlgorithmFactory.newInstance(dataConsistencyCheckerConfig.getType(),
dataConsistencyCheckerConfig.getProps())
- : null;
- inventoryDumperExecuteEngine =
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(),
INVENTORY_THREAD_PREFIX + jobId);
- incrementalDumperExecuteEngine =
ExecuteEngine.newCachedThreadInstance(INCREMENTAL_THREAD_PREFIX + jobId);
- importerExecuteEngine =
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(),
IMPORTER_THREAD_PREFIX + jobId);
+ inventoryDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
+ @Override
+ protected ExecuteEngine initialize() {
+ return
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(),
"Inventory-" + jobId);
+ }
+ };
+ incrementalDumperExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
+ @Override
+ protected ExecuteEngine initialize() {
+ return ExecuteEngine.newCachedThreadInstance("Incremental-" +
jobId);
+ }
+ };
+ importerExecuteEngineLazyInitializer = new
LazyInitializer<ExecuteEngine>() {
+ @Override
+ protected ExecuteEngine initialize() {
+ return
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(),
"Importer-" + jobId);
+ }
+ };
}
- private OnRuleAlteredActionConfiguration convertActionConfig(final
OnRuleAlteredActionConfiguration actionConfig) {
- YamlOnRuleAlteredActionConfiguration yamlActionConfig =
SWAPPER.swapToYamlConfiguration(actionConfig);
+ private PipelineProcessConfiguration convertProcessConfig(final
PipelineProcessConfiguration originalProcessConfig) {
+ YamlPipelineProcessConfiguration yamlActionConfig =
SWAPPER.swapToYamlConfiguration(originalProcessConfig);
if (null == yamlActionConfig.getInput()) {
yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
} else {
@@ -116,4 +111,34 @@ public final class RuleAlteredContext {
}
return SWAPPER.swapToObject(yamlActionConfig);
}
+
+ /**
+ * Get inventory dumper execute engine.
+ *
+ * @return inventory dumper execute engine
+ */
+ @SneakyThrows(ConcurrentException.class)
+ public ExecuteEngine getInventoryDumperExecuteEngine() {
+ return inventoryDumperExecuteEngineLazyInitializer.get();
+ }
+
+ /**
+ * Get incremental dumper execute engine.
+ *
+ * @return incremental dumper execute engine
+ */
+ @SneakyThrows(ConcurrentException.class)
+ public ExecuteEngine getIncrementalDumperExecuteEngine() {
+ return incrementalDumperExecuteEngineLazyInitializer.get();
+ }
+
+ /**
+ * Get importer execute engine.
+ *
+ * @return importer execute engine
+ */
+ @SneakyThrows(ConcurrentException.class)
+ public ExecuteEngine getImporterExecuteEngine() {
+ return importerExecuteEngineLazyInitializer.get();
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
new file mode 100644
index 00000000000..866173c06b4
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/PipelineProcessContext.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+/**
+ * Pipeline process context.
+ */
+public interface PipelineProcessContext {
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
index b6e0c69070d..81991f5b800 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredContext.java
@@ -21,99 +21,36 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MemoryPipelineChannelCreator;
+import
org.apache.shardingsphere.data.pipeline.core.context.AbstractPipelineProcessContext;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.detect.JobCompletionDetectAlgorithmFactory;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreatorFactory;
-import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithmFactory;
import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineOutputConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineInputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.data.pipeline.YamlPipelineOutputConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlOnRuleAlteredActionConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.rulealtered.YamlOnRuleAlteredActionConfigurationSwapper;
-import java.util.Properties;
-
/**
* Rule altered context.
*/
@Getter
@Slf4j
-// TODO extract Pipeline Context
-public final class RuleAlteredContext {
-
- private static final String INVENTORY_THREAD_PREFIX = "Inventory-";
-
- private static final String INCREMENTAL_THREAD_PREFIX = "Incremental-";
-
- private static final String IMPORTER_THREAD_PREFIX = "Importer-";
+public final class RuleAlteredContext extends AbstractPipelineProcessContext {
private static final YamlOnRuleAlteredActionConfigurationSwapper SWAPPER =
new YamlOnRuleAlteredActionConfigurationSwapper();
- private final OnRuleAlteredActionConfiguration onRuleAlteredActionConfig;
-
- private final JobRateLimitAlgorithm inputRateLimitAlgorithm;
-
- private final JobRateLimitAlgorithm outputRateLimitAlgorithm;
-
- private final PipelineChannelCreator pipelineChannelCreator;
-
private final
JobCompletionDetectAlgorithm<RuleAlteredJobAlmostCompletedParameter>
completionDetectAlgorithm;
private final DataConsistencyCalculateAlgorithm
dataConsistencyCalculateAlgorithm;
- private final ExecuteEngine inventoryDumperExecuteEngine;
-
- private final ExecuteEngine incrementalDumperExecuteEngine;
-
- private final ExecuteEngine importerExecuteEngine;
-
@SuppressWarnings("unchecked")
public RuleAlteredContext(final String jobId, final
OnRuleAlteredActionConfiguration actionConfig) {
- OnRuleAlteredActionConfiguration onRuleAlteredActionConfig =
convertActionConfig(actionConfig);
- this.onRuleAlteredActionConfig = onRuleAlteredActionConfig;
- PipelineInputConfiguration inputConfig =
onRuleAlteredActionConfig.getInput();
- AlgorithmConfiguration inputRateLimiter = inputConfig.getRateLimiter();
- inputRateLimitAlgorithm = null != inputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(inputRateLimiter) : null;
- PipelineOutputConfiguration outputConfig =
onRuleAlteredActionConfig.getOutput();
- AlgorithmConfiguration outputRateLimiter =
outputConfig.getRateLimiter();
- outputRateLimitAlgorithm = null != outputRateLimiter ?
JobRateLimitAlgorithmFactory.newInstance(outputRateLimiter) : null;
- AlgorithmConfiguration streamChannel =
onRuleAlteredActionConfig.getStreamChannel();
- pipelineChannelCreator =
PipelineChannelCreatorFactory.newInstance(streamChannel);
- AlgorithmConfiguration completionDetector =
onRuleAlteredActionConfig.getCompletionDetector();
+ super(jobId, new PipelineProcessConfiguration(actionConfig.getInput(),
actionConfig.getOutput(), actionConfig.getStreamChannel()));
+ AlgorithmConfiguration completionDetector =
actionConfig.getCompletionDetector();
completionDetectAlgorithm = null != completionDetector ?
JobCompletionDetectAlgorithmFactory.newInstance(completionDetector) : null;
- AlgorithmConfiguration dataConsistencyCheckerConfig =
onRuleAlteredActionConfig.getDataConsistencyCalculator();
+ AlgorithmConfiguration dataConsistencyCheckerConfig =
actionConfig.getDataConsistencyCalculator();
dataConsistencyCalculateAlgorithm = null !=
dataConsistencyCheckerConfig
?
DataConsistencyCalculateAlgorithmFactory.newInstance(dataConsistencyCheckerConfig.getType(),
dataConsistencyCheckerConfig.getProps())
: null;
- inventoryDumperExecuteEngine =
ExecuteEngine.newFixedThreadInstance(inputConfig.getWorkerThread(),
INVENTORY_THREAD_PREFIX + jobId);
- incrementalDumperExecuteEngine =
ExecuteEngine.newCachedThreadInstance(INCREMENTAL_THREAD_PREFIX + jobId);
- importerExecuteEngine =
ExecuteEngine.newFixedThreadInstance(outputConfig.getWorkerThread(),
IMPORTER_THREAD_PREFIX + jobId);
- }
-
- private OnRuleAlteredActionConfiguration convertActionConfig(final
OnRuleAlteredActionConfiguration actionConfig) {
- YamlOnRuleAlteredActionConfiguration yamlActionConfig =
SWAPPER.swapToYamlConfiguration(actionConfig);
- if (null == yamlActionConfig.getInput()) {
-
yamlActionConfig.setInput(YamlPipelineInputConfiguration.buildWithDefaultValue());
- } else {
- yamlActionConfig.getInput().fillInNullFieldsWithDefaultValue();
- }
- if (null == yamlActionConfig.getOutput()) {
-
yamlActionConfig.setOutput(YamlPipelineOutputConfiguration.buildWithDefaultValue());
- } else {
- yamlActionConfig.getOutput().fillInNullFieldsWithDefaultValue();
- }
- if (null == yamlActionConfig.getStreamChannel()) {
- yamlActionConfig.setStreamChannel(new
YamlAlgorithmConfiguration(MemoryPipelineChannelCreator.TYPE, new
Properties()));
- }
- return SWAPPER.swapToObject(yamlActionConfig);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index a8c968b4aea..65f9b35d133 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -94,7 +94,7 @@ public final class RuleAlteredJobContext {
this.initProgress = initProgress;
this.dataSourceManager = dataSourceManager;
this.jobPreparer = jobPreparer;
- taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig,
jobShardingItem, ruleAlteredContext.getOnRuleAlteredActionConfig());
+ taskConfig = RuleAlteredJobWorker.buildTaskConfig(jobConfig,
jobShardingItem, ruleAlteredContext.getPipelineProcessConfig());
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index 24493a664d3..c7e8633ab8e 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -40,6 +40,7 @@ import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetect
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredDetectorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
+import
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
import
org.apache.shardingsphere.infra.config.rule.rulealtered.OnRuleAlteredActionConfiguration;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
@@ -248,11 +249,11 @@ public final class RuleAlteredJobWorker {
*
* @param jobConfig job configuration
* @param jobShardingItem job sharding item
- * @param onRuleAlteredActionConfig action configuration
+ * @param pipelineProcessConfig pipeline process configuration
* @return task configuration
*/
- public static TaskConfiguration buildTaskConfig(final
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final
OnRuleAlteredActionConfiguration onRuleAlteredActionConfig) {
- return
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
jobShardingItem, onRuleAlteredActionConfig);
+ public static TaskConfiguration buildTaskConfig(final
RuleAlteredJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
+ return
RuleAlteredJobConfigurationPreparerFactory.getInstance().createTaskConfiguration(jobConfig,
jobShardingItem, pipelineProcessConfig);
}
private boolean hasUncompletedJobOfSameDatabaseName(final String
databaseName) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 635711c39c8..fb4609f1dec 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -111,7 +111,7 @@ public final class InventoryTaskSplitter {
final
InventoryDumperConfiguration dumperConfig) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
RuleAlteredContext ruleAlteredContext =
jobContext.getRuleAlteredContext();
- PipelineInputConfiguration inputConfig =
ruleAlteredContext.getOnRuleAlteredActionConfig().getInput();
+ PipelineInputConfiguration inputConfig =
ruleAlteredContext.getPipelineProcessConfig().getInput();
int batchSize = inputConfig.getBatchSize();
JobRateLimitAlgorithm rateLimitAlgorithm =
ruleAlteredContext.getInputRateLimitAlgorithm();
Collection<IngestPosition<?>> inventoryPositions =
getInventoryPositions(jobContext, dumperConfig, dataSource, metaDataLoader);
@@ -192,7 +192,7 @@ public final class InventoryTaskSplitter {
RuleAlteredJobConfiguration jobConfig = jobContext.getJobConfig();
String sql =
PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType())
.buildSplitByPrimaryKeyRangeSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), dumperConfig.getUniqueKey());
- int shardingSize =
jobContext.getRuleAlteredContext().getOnRuleAlteredActionConfig().getInput().getShardingSize();
+ int shardingSize =
jobContext.getRuleAlteredContext().getPipelineProcessConfig().getInput().getShardingSize();
try (
Connection connection = dataSource.getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 920cedc9669..d51586c71d9 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -68,7 +68,7 @@ public final class RuleAlteredJobWorkerTest {
@Test
public void assertCreateRuleAlteredContextSuccess() {
-
assertNotNull(RuleAlteredJobWorker.createRuleAlteredContext(JobConfigurationBuilder.createJobConfiguration()).getOnRuleAlteredActionConfig());
+
assertNotNull(RuleAlteredJobWorker.createRuleAlteredContext(JobConfigurationBuilder.createJobConfiguration()).getPipelineProcessConfig());
}
@Test