This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 373a607f321 Merge TransmissionProcessContext and
AbstractTransmissionProcessContext (#29254)
373a607f321 is described below
commit 373a607f3215714f7423efa4636e4cc377af6a09
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 2 00:19:54 2023 +0800
Merge TransmissionProcessContext and AbstractTransmissionProcessContext
(#29254)
---
.../AbstractTransmissionProcessContext.java | 122 ---------------------
.../common/context/TransmissionProcessContext.java | 106 +++++++++++++++---
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 6 +-
.../data/pipeline/cdc/CDCJobOption.java | 5 +-
.../pipeline/cdc/context/CDCJobItemContext.java | 5 +-
.../pipeline/cdc/context/CDCProcessContext.java | 31 ------
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 6 +-
.../task/ConsistencyCheckTasksRunner.java | 3 +-
.../pipeline/scenario/migration/MigrationJob.java | 6 +-
.../scenario/migration/MigrationJobOption.java | 5 +-
.../migration/context/MigrationJobItemContext.java | 7 +-
.../migration/context/MigrationProcessContext.java | 39 -------
.../pipeline/core/util/PipelineContextUtils.java | 6 +-
.../MigrationDataConsistencyCheckerTest.java | 6 +-
14 files changed, 115 insertions(+), 238 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java
deleted file mode 100644
index 62b28959edc..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.context;
-
-import lombok.Getter;
-import lombok.SneakyThrows;
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineWriteConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
-import
org.apache.shardingsphere.data.pipeline.common.util.PipelineLazyInitializer;
-import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-
-/**
- * Abstract transmission process context.
- */
-@Getter
-public abstract class AbstractTransmissionProcessContext implements
TransmissionProcessContext {
-
- private final PipelineProcessConfiguration pipelineProcessConfig;
-
- private final JobRateLimitAlgorithm readRateLimitAlgorithm;
-
- private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
-
- private final PipelineChannelCreator pipelineChannelCreator;
-
- private final PipelineLazyInitializer<ExecuteEngine>
inventoryDumperExecuteEngineLazyInitializer;
-
- private final PipelineLazyInitializer<ExecuteEngine>
inventoryImporterExecuteEngineLazyInitializer;
-
- private final PipelineLazyInitializer<ExecuteEngine>
incrementalExecuteEngineLazyInitializer;
-
- protected AbstractTransmissionProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
- this.pipelineProcessConfig = processConfig;
- PipelineReadConfiguration readConfig = processConfig.getRead();
- AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
- readRateLimitAlgorithm = null == readRateLimiter ? null :
TypedSPILoader.getService(JobRateLimitAlgorithm.class,
readRateLimiter.getType(), readRateLimiter.getProps());
- PipelineWriteConfiguration writeConfig = processConfig.getWrite();
- AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
- writeRateLimitAlgorithm = null == writeRateLimiter ? null :
TypedSPILoader.getService(JobRateLimitAlgorithm.class,
writeRateLimiter.getType(), writeRateLimiter.getProps());
- AlgorithmConfiguration streamChannel =
processConfig.getStreamChannel();
- pipelineChannelCreator =
TypedSPILoader.getService(PipelineChannelCreator.class,
streamChannel.getType(), streamChannel.getProps());
- inventoryDumperExecuteEngineLazyInitializer = new
PipelineLazyInitializer<ExecuteEngine>() {
-
- @Override
- protected ExecuteEngine doInitialize() {
- return
ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-"
+ jobId);
- }
- };
- inventoryImporterExecuteEngineLazyInitializer = new
PipelineLazyInitializer<ExecuteEngine>() {
-
- @Override
- protected ExecuteEngine doInitialize() {
- return
ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-"
+ jobId);
- }
- };
- incrementalExecuteEngineLazyInitializer = new
PipelineLazyInitializer<ExecuteEngine>() {
-
- @Override
- protected ExecuteEngine doInitialize() {
- return ExecuteEngine.newCachedThreadInstance("Incremental-" +
jobId);
- }
- };
- }
-
- @Override
- @SneakyThrows(ConcurrentException.class)
- public ExecuteEngine getInventoryDumperExecuteEngine() {
- return inventoryDumperExecuteEngineLazyInitializer.get();
- }
-
- @Override
- @SneakyThrows(ConcurrentException.class)
- public ExecuteEngine getInventoryImporterExecuteEngine() {
- return inventoryImporterExecuteEngineLazyInitializer.get();
- }
-
- /**
- * Get incremental execute engine.
- *
- * @return incremental execute engine
- */
- @SneakyThrows(ConcurrentException.class)
- public ExecuteEngine getIncrementalExecuteEngine() {
- return incrementalExecuteEngineLazyInitializer.get();
- }
-
- @Override
- public void close() throws Exception {
- shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
- shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
- shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
- }
-
- private void shutdownExecuteEngine(final
PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws
ConcurrentException {
- if (lazyInitializer.isInitialized()) {
- lazyInitializer.get().shutdown();
- }
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
index e09b9ca9d96..8865cc72214 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
@@ -17,47 +17,117 @@
package org.apache.shardingsphere.data.pipeline.common.context;
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineWriteConfiguration;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.common.util.PipelineLazyInitializer;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
/**
* Transmission process context.
*/
-public interface TransmissionProcessContext extends PipelineProcessContext {
+public final class TransmissionProcessContext implements
PipelineProcessContext {
- /**
- * Get pipeline channel creator.
- *
- * @return pipeline channel creator
- */
- PipelineChannelCreator getPipelineChannelCreator();
+ @Getter
+ private final PipelineProcessConfiguration pipelineProcessConfig;
+
+ @Getter
+ private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+
+ @Getter
+ private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
+
+ @Getter
+ private final PipelineChannelCreator pipelineChannelCreator;
+
+ private final PipelineLazyInitializer<ExecuteEngine>
inventoryDumperExecuteEngineLazyInitializer;
+
+ private final PipelineLazyInitializer<ExecuteEngine>
inventoryImporterExecuteEngineLazyInitializer;
+
+ private final PipelineLazyInitializer<ExecuteEngine>
incrementalExecuteEngineLazyInitializer;
+
+ public TransmissionProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
+ PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
+ this.pipelineProcessConfig = processConfig;
+ PipelineReadConfiguration readConfig = processConfig.getRead();
+ AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
+ readRateLimitAlgorithm = null == readRateLimiter ? null :
TypedSPILoader.getService(JobRateLimitAlgorithm.class,
readRateLimiter.getType(), readRateLimiter.getProps());
+ PipelineWriteConfiguration writeConfig = processConfig.getWrite();
+ AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
+ writeRateLimitAlgorithm = null == writeRateLimiter ? null :
TypedSPILoader.getService(JobRateLimitAlgorithm.class,
writeRateLimiter.getType(), writeRateLimiter.getProps());
+ AlgorithmConfiguration streamChannel =
processConfig.getStreamChannel();
+ pipelineChannelCreator =
TypedSPILoader.getService(PipelineChannelCreator.class,
streamChannel.getType(), streamChannel.getProps());
+ inventoryDumperExecuteEngineLazyInitializer = new
PipelineLazyInitializer<ExecuteEngine>() {
+
+ @Override
+ protected ExecuteEngine doInitialize() {
+ return
ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-"
+ jobId);
+ }
+ };
+ inventoryImporterExecuteEngineLazyInitializer = new
PipelineLazyInitializer<ExecuteEngine>() {
+
+ @Override
+ protected ExecuteEngine doInitialize() {
+ return
ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-"
+ jobId);
+ }
+ };
+ incrementalExecuteEngineLazyInitializer = new
PipelineLazyInitializer<ExecuteEngine>() {
+
+ @Override
+ protected ExecuteEngine doInitialize() {
+ return ExecuteEngine.newCachedThreadInstance("Incremental-" +
jobId);
+ }
+ };
+ }
/**
* Get inventory dumper execute engine.
*
* @return inventory dumper execute engine
*/
- ExecuteEngine getInventoryDumperExecuteEngine();
+ @SneakyThrows(ConcurrentException.class)
+ public ExecuteEngine getInventoryDumperExecuteEngine() {
+ return inventoryDumperExecuteEngineLazyInitializer.get();
+ }
/**
* Get inventory importer execute engine.
*
* @return inventory importer execute engine
*/
- ExecuteEngine getInventoryImporterExecuteEngine();
+ @SneakyThrows(ConcurrentException.class)
+ public ExecuteEngine getInventoryImporterExecuteEngine() {
+ return inventoryImporterExecuteEngineLazyInitializer.get();
+ }
/**
- * Get job read rate limit algorithm.
+ * Get incremental execute engine.
*
- * @return job read rate limit algorithm
+ * @return incremental execute engine
*/
- JobRateLimitAlgorithm getReadRateLimitAlgorithm();
+ @SneakyThrows(ConcurrentException.class)
+ public ExecuteEngine getIncrementalExecuteEngine() {
+ return incrementalExecuteEngineLazyInitializer.get();
+ }
- /**
- * Get job write rate limit algorithm.
- *
- * @return job write rate limit algorithm
- */
- JobRateLimitAlgorithm getWriteRateLimitAlgorithm();
+ @Override
+ public void close() throws Exception {
+ shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
+ shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
+ shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
+ }
+
+ private void shutdownExecuteEngine(final
PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws
ConcurrentException {
+ if (lazyInitializer.isInitialized()) {
+ lazyInitializer.get().shutdown();
+ }
+ }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index c70c81d0122..2daeb29de80 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
@@ -35,6 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
@@ -129,7 +129,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
- CDCProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
+ TransmissionProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
}
@@ -156,7 +156,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
- CDCProcessContext processContext = new
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+ TransmissionProcessContext processContext = new
TransmissionProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index 331017ac3f3..496bb97b2dd 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.cdc;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -62,9 +61,9 @@ public final class CDCJobOption implements
TransmissionJobOption {
}
@Override
- public CDCProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
+ public TransmissionProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
TransmissionJobManager jobManager = new TransmissionJobManager(this);
- return new CDCProcessContext(jobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
+ return new TransmissionProcessContext(jobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index d07281e456b..76224a68d04 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
@@ -58,7 +59,7 @@ public final class CDCJobItemContext implements
TransmissionJobItemContext {
private final TransmissionJobItemProgress initProgress;
- private final CDCProcessContext jobProcessContext;
+ private final TransmissionProcessContext jobProcessContext;
private final CDCTaskConfiguration taskConfig;
@@ -90,7 +91,7 @@ public final class CDCJobItemContext implements
TransmissionJobItemContext {
}
};
- public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int
shardingItem, final TransmissionJobItemProgress initProgress, final
CDCProcessContext jobProcessContext,
+ public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int
shardingItem, final TransmissionJobItemProgress initProgress, final
TransmissionProcessContext jobProcessContext,
final CDCTaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineSink sink) {
this.jobConfig = jobConfig;
this.shardingItem = shardingItem;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java
deleted file mode 100644
index 857106e90fa..00000000000
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.context;
-
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.AbstractTransmissionProcessContext;
-
-/**
- * CDC process context.
- */
-public final class CDCProcessContext extends
AbstractTransmissionProcessContext {
-
- public CDCProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
- super(jobId, originalProcessConfig);
- }
-}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 7709cb847e9..4cb7e57ea51 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -21,15 +21,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
import
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -119,7 +119,7 @@ public final class CDCJobPreparer {
long startTimeMillis = System.currentTimeMillis();
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
- CDCProcessContext processContext =
jobItemContext.getJobProcessContext();
+ TransmissionProcessContext processContext =
jobItemContext.getJobProcessContext();
for (InventoryDumperContext each : new
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()),
importerConfig)
.splitInventoryDumperContext(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index a15cb32f87b..298a0a7706a 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -105,8 +105,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
- PipelineDataConsistencyChecker checker =
jobOption.buildDataConsistencyChecker(
- parentJobConfig,
jobOption.buildProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
+ PipelineDataConsistencyChecker checker =
jobOption.buildDataConsistencyChecker(parentJobConfig,
jobOption.buildProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap =
checker.check(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
log.info("job {} with check algorithm '{}' data consistency
checker result: {}, stopping: {}",
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 63fdd02f23d..b92fc6eb22f 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -25,6 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfigurati
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
@@ -44,7 +45,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
@@ -83,7 +83,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
- MigrationProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
+ TransmissionProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
@@ -120,7 +120,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- JobRateLimitAlgorithm writeRateLimitAlgorithm = new
MigrationProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+ JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 03b8f537bf4..5e70e0a5360 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobM
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import java.util.Collection;
import java.util.LinkedList;
@@ -76,9 +75,9 @@ public final class MigrationJobOption implements
TransmissionJobOption {
}
@Override
- public MigrationProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
+ public TransmissionProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
PipelineProcessConfiguration processConfig = new
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- return new MigrationProcessContext(jobConfig.getJobId(),
processConfig);
+ return new TransmissionProcessContext(jobConfig.getJobId(),
processConfig);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index dbffa7cb409..c2a6bb9d2a9 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -22,13 +22,14 @@ import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
@@ -72,7 +73,7 @@ public final class MigrationJobItemContext implements
TransmissionJobItemContext
private final MigrationJobConfiguration jobConfig;
- private final MigrationProcessContext jobProcessContext;
+ private final TransmissionProcessContext jobProcessContext;
private final PipelineDataSourceManager dataSourceManager;
@@ -93,7 +94,7 @@ public final class MigrationJobItemContext implements
TransmissionJobItemContext
};
public MigrationJobItemContext(final MigrationJobConfiguration jobConfig,
final int shardingItem, final TransmissionJobItemProgress initProgress,
- final MigrationProcessContext
jobProcessContext, final MigrationTaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager) {
+ final TransmissionProcessContext
jobProcessContext, final MigrationTaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager) {
this.jobConfig = jobConfig;
jobId = jobConfig.getJobId();
this.shardingItem = shardingItem;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
deleted file mode 100644
index a382a4ca5cc..00000000000
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.migration.context;
-
-import lombok.Getter;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.AbstractTransmissionProcessContext;
-
-/**
- * Migration process context.
- */
-@Getter
-public final class MigrationProcessContext extends
AbstractTransmissionProcessContext {
-
- /**
- * Constructor.
- *
- * @param jobId job id
- * @param originalProcessConfig original process configuration, nullable
- */
- public MigrationProcessContext(final String jobId, final
PipelineProcessConfiguration originalProcessConfig) {
- super(jobId, originalProcessConfig);
- }
-}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index efa91b9cae2..5adb0e55c46 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swappe
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
@@ -44,7 +45,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -181,7 +181,7 @@ public final class PipelineContextUtils {
*/
public static MigrationJobItemContext mockMigrationJobItemContext(final
MigrationJobConfiguration jobConfig) {
PipelineProcessConfiguration processConfig =
mockPipelineProcessConfiguration();
- MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), processConfig);
+ TransmissionProcessContext processContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
int jobShardingItem = 0;
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
processContext, taskConfig, new DefaultPipelineDataSourceManager());
@@ -224,7 +224,7 @@ public final class PipelineContextUtils {
private static ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- JobRateLimitAlgorithm writeRateLimitAlgorithm = new
MigrationProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+ JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index bc0d65ba749..d187c1d4713 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
@@ -28,9 +29,8 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -64,7 +64,7 @@ class MigrationDataConsistencyCheckerTest {
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config",
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(),
0, "");
- Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
MigrationProcessContext(jobConfig.getJobId(), null),
+ Map<String, TableDataConsistencyCheckResult> actual = new
MigrationDataConsistencyChecker(jobConfig, new
TransmissionProcessContext(jobConfig.getJobId(), null),
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
null);
String checkKey = "t_order";
assertTrue(actual.get(checkKey).isMatched());