This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 373a607f321 Merge TransmissionProcessContext and 
AbstractTransmissionProcessContext (#29254)
373a607f321 is described below

commit 373a607f3215714f7423efa4636e4cc377af6a09
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 2 00:19:54 2023 +0800

    Merge TransmissionProcessContext and AbstractTransmissionProcessContext 
(#29254)
---
 .../AbstractTransmissionProcessContext.java        | 122 ---------------------
 .../common/context/TransmissionProcessContext.java | 106 +++++++++++++++---
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |   6 +-
 .../data/pipeline/cdc/CDCJobOption.java            |   5 +-
 .../pipeline/cdc/context/CDCJobItemContext.java    |   5 +-
 .../pipeline/cdc/context/CDCProcessContext.java    |  31 ------
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |   6 +-
 .../task/ConsistencyCheckTasksRunner.java          |   3 +-
 .../pipeline/scenario/migration/MigrationJob.java  |   6 +-
 .../scenario/migration/MigrationJobOption.java     |   5 +-
 .../migration/context/MigrationJobItemContext.java |   7 +-
 .../migration/context/MigrationProcessContext.java |  39 -------
 .../pipeline/core/util/PipelineContextUtils.java   |   6 +-
 .../MigrationDataConsistencyCheckerTest.java       |   6 +-
 14 files changed, 115 insertions(+), 238 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java
deleted file mode 100644
index 62b28959edc..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/AbstractTransmissionProcessContext.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.context;
-
-import lombok.Getter;
-import lombok.SneakyThrows;
-import org.apache.commons.lang3.concurrent.ConcurrentException;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineWriteConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
-import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineLazyInitializer;
-import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-
-/**
- * Abstract transmission process context.
- */
-@Getter
-public abstract class AbstractTransmissionProcessContext implements 
TransmissionProcessContext {
-    
-    private final PipelineProcessConfiguration pipelineProcessConfig;
-    
-    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
-    
-    private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
-    
-    private final PipelineChannelCreator pipelineChannelCreator;
-    
-    private final PipelineLazyInitializer<ExecuteEngine> 
inventoryDumperExecuteEngineLazyInitializer;
-    
-    private final PipelineLazyInitializer<ExecuteEngine> 
inventoryImporterExecuteEngineLazyInitializer;
-    
-    private final PipelineLazyInitializer<ExecuteEngine> 
incrementalExecuteEngineLazyInitializer;
-    
-    protected AbstractTransmissionProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
-        this.pipelineProcessConfig = processConfig;
-        PipelineReadConfiguration readConfig = processConfig.getRead();
-        AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
-        readRateLimitAlgorithm = null == readRateLimiter ? null : 
TypedSPILoader.getService(JobRateLimitAlgorithm.class, 
readRateLimiter.getType(), readRateLimiter.getProps());
-        PipelineWriteConfiguration writeConfig = processConfig.getWrite();
-        AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
-        writeRateLimitAlgorithm = null == writeRateLimiter ? null : 
TypedSPILoader.getService(JobRateLimitAlgorithm.class, 
writeRateLimiter.getType(), writeRateLimiter.getProps());
-        AlgorithmConfiguration streamChannel = 
processConfig.getStreamChannel();
-        pipelineChannelCreator = 
TypedSPILoader.getService(PipelineChannelCreator.class, 
streamChannel.getType(), streamChannel.getProps());
-        inventoryDumperExecuteEngineLazyInitializer = new 
PipelineLazyInitializer<ExecuteEngine>() {
-            
-            @Override
-            protected ExecuteEngine doInitialize() {
-                return 
ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" 
+ jobId);
-            }
-        };
-        inventoryImporterExecuteEngineLazyInitializer = new 
PipelineLazyInitializer<ExecuteEngine>() {
-            
-            @Override
-            protected ExecuteEngine doInitialize() {
-                return 
ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" 
+ jobId);
-            }
-        };
-        incrementalExecuteEngineLazyInitializer = new 
PipelineLazyInitializer<ExecuteEngine>() {
-            
-            @Override
-            protected ExecuteEngine doInitialize() {
-                return ExecuteEngine.newCachedThreadInstance("Incremental-" + 
jobId);
-            }
-        };
-    }
-    
-    @Override
-    @SneakyThrows(ConcurrentException.class)
-    public ExecuteEngine getInventoryDumperExecuteEngine() {
-        return inventoryDumperExecuteEngineLazyInitializer.get();
-    }
-    
-    @Override
-    @SneakyThrows(ConcurrentException.class)
-    public ExecuteEngine getInventoryImporterExecuteEngine() {
-        return inventoryImporterExecuteEngineLazyInitializer.get();
-    }
-    
-    /**
-     * Get incremental execute engine.
-     *
-     * @return incremental execute engine
-     */
-    @SneakyThrows(ConcurrentException.class)
-    public ExecuteEngine getIncrementalExecuteEngine() {
-        return incrementalExecuteEngineLazyInitializer.get();
-    }
-    
-    @Override
-    public void close() throws Exception {
-        shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
-        shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
-        shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
-    }
-    
-    private void shutdownExecuteEngine(final 
PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws 
ConcurrentException {
-        if (lazyInitializer.isInitialized()) {
-            lazyInitializer.get().shutdown();
-        }
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
index e09b9ca9d96..8865cc72214 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/context/TransmissionProcessContext.java
@@ -17,47 +17,117 @@
 
 package org.apache.shardingsphere.data.pipeline.common.context;
 
+import lombok.Getter;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineWriteConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.channel.PipelineChannelCreator;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineLazyInitializer;
+import org.apache.shardingsphere.infra.config.algorithm.AlgorithmConfiguration;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 /**
  * Transmission process context.
  */
-public interface TransmissionProcessContext extends PipelineProcessContext {
+public final class TransmissionProcessContext implements 
PipelineProcessContext {
     
-    /**
-     * Get pipeline channel creator.
-     *
-     * @return pipeline channel creator
-     */
-    PipelineChannelCreator getPipelineChannelCreator();
+    @Getter
+    private final PipelineProcessConfiguration pipelineProcessConfig;
+    
+    @Getter
+    private final JobRateLimitAlgorithm readRateLimitAlgorithm;
+    
+    @Getter
+    private final JobRateLimitAlgorithm writeRateLimitAlgorithm;
+    
+    @Getter
+    private final PipelineChannelCreator pipelineChannelCreator;
+    
+    private final PipelineLazyInitializer<ExecuteEngine> 
inventoryDumperExecuteEngineLazyInitializer;
+    
+    private final PipelineLazyInitializer<ExecuteEngine> 
inventoryImporterExecuteEngineLazyInitializer;
+    
+    private final PipelineLazyInitializer<ExecuteEngine> 
incrementalExecuteEngineLazyInitializer;
+    
+    public TransmissionProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(originalProcessConfig);
+        this.pipelineProcessConfig = processConfig;
+        PipelineReadConfiguration readConfig = processConfig.getRead();
+        AlgorithmConfiguration readRateLimiter = readConfig.getRateLimiter();
+        readRateLimitAlgorithm = null == readRateLimiter ? null : 
TypedSPILoader.getService(JobRateLimitAlgorithm.class, 
readRateLimiter.getType(), readRateLimiter.getProps());
+        PipelineWriteConfiguration writeConfig = processConfig.getWrite();
+        AlgorithmConfiguration writeRateLimiter = writeConfig.getRateLimiter();
+        writeRateLimitAlgorithm = null == writeRateLimiter ? null : 
TypedSPILoader.getService(JobRateLimitAlgorithm.class, 
writeRateLimiter.getType(), writeRateLimiter.getProps());
+        AlgorithmConfiguration streamChannel = 
processConfig.getStreamChannel();
+        pipelineChannelCreator = 
TypedSPILoader.getService(PipelineChannelCreator.class, 
streamChannel.getType(), streamChannel.getProps());
+        inventoryDumperExecuteEngineLazyInitializer = new 
PipelineLazyInitializer<ExecuteEngine>() {
+            
+            @Override
+            protected ExecuteEngine doInitialize() {
+                return 
ExecuteEngine.newFixedThreadInstance(readConfig.getWorkerThread(), "Inventory-" 
+ jobId);
+            }
+        };
+        inventoryImporterExecuteEngineLazyInitializer = new 
PipelineLazyInitializer<ExecuteEngine>() {
+            
+            @Override
+            protected ExecuteEngine doInitialize() {
+                return 
ExecuteEngine.newFixedThreadInstance(writeConfig.getWorkerThread(), "Importer-" 
+ jobId);
+            }
+        };
+        incrementalExecuteEngineLazyInitializer = new 
PipelineLazyInitializer<ExecuteEngine>() {
+            
+            @Override
+            protected ExecuteEngine doInitialize() {
+                return ExecuteEngine.newCachedThreadInstance("Incremental-" + 
jobId);
+            }
+        };
+    }
     
     /**
      * Get inventory dumper execute engine.
      *
      * @return inventory dumper execute engine
      */
-    ExecuteEngine getInventoryDumperExecuteEngine();
+    @SneakyThrows(ConcurrentException.class)
+    public ExecuteEngine getInventoryDumperExecuteEngine() {
+        return inventoryDumperExecuteEngineLazyInitializer.get();
+    }
     
     /**
      * Get inventory importer execute engine.
      *
      * @return inventory importer execute engine
      */
-    ExecuteEngine getInventoryImporterExecuteEngine();
+    @SneakyThrows(ConcurrentException.class)
+    public ExecuteEngine getInventoryImporterExecuteEngine() {
+        return inventoryImporterExecuteEngineLazyInitializer.get();
+    }
     
     /**
-     * Get job read rate limit algorithm.
+     * Get incremental execute engine.
      *
-     * @return job read rate limit algorithm
+     * @return incremental execute engine
      */
-    JobRateLimitAlgorithm getReadRateLimitAlgorithm();
+    @SneakyThrows(ConcurrentException.class)
+    public ExecuteEngine getIncrementalExecuteEngine() {
+        return incrementalExecuteEngineLazyInitializer.get();
+    }
     
-    /**
-     * Get job write rate limit algorithm.
-     *
-     * @return job write rate limit algorithm
-     */
-    JobRateLimitAlgorithm getWriteRateLimitAlgorithm();
+    @Override
+    public void close() throws Exception {
+        shutdownExecuteEngine(inventoryDumperExecuteEngineLazyInitializer);
+        shutdownExecuteEngine(inventoryImporterExecuteEngineLazyInitializer);
+        shutdownExecuteEngine(incrementalExecuteEngineLazyInitializer);
+    }
+    
+    private void shutdownExecuteEngine(final 
PipelineLazyInitializer<ExecuteEngine> lazyInitializer) throws 
ConcurrentException {
+        if (lazyInitializer.isInitialized()) {
+            lazyInitializer.get().shutdown();
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index c70c81d0122..2daeb29de80 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguratio
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
@@ -35,6 +34,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
@@ -129,7 +129,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
-        CDCProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
+        TransmissionProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
         CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
     }
@@ -156,7 +156,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                                                              final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
                 jobConfig.getDataSourceConfig().getParameter());
-        CDCProcessContext processContext = new 
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+        TransmissionProcessContext processContext = new 
TransmissionProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
         JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index 331017ac3f3..496bb97b2dd 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -20,7 +20,6 @@ package org.apache.shardingsphere.data.pipeline.cdc;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -62,9 +61,9 @@ public final class CDCJobOption implements 
TransmissionJobOption {
     }
     
     @Override
-    public CDCProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
+    public TransmissionProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
         TransmissionJobManager jobManager = new TransmissionJobManager(this);
-        return new CDCProcessContext(jobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
+        return new TransmissionProcessContext(jobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index d07281e456b..76224a68d04 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -25,6 +25,7 @@ import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
@@ -58,7 +59,7 @@ public final class CDCJobItemContext implements 
TransmissionJobItemContext {
     
     private final TransmissionJobItemProgress initProgress;
     
-    private final CDCProcessContext jobProcessContext;
+    private final TransmissionProcessContext jobProcessContext;
     
     private final CDCTaskConfiguration taskConfig;
     
@@ -90,7 +91,7 @@ public final class CDCJobItemContext implements 
TransmissionJobItemContext {
         }
     };
     
-    public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int 
shardingItem, final TransmissionJobItemProgress initProgress, final 
CDCProcessContext jobProcessContext,
+    public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int 
shardingItem, final TransmissionJobItemProgress initProgress, final 
TransmissionProcessContext jobProcessContext,
                              final CDCTaskConfiguration taskConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineSink sink) {
         this.jobConfig = jobConfig;
         this.shardingItem = shardingItem;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java
deleted file mode 100644
index 857106e90fa..00000000000
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCProcessContext.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.cdc.context;
-
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.AbstractTransmissionProcessContext;
-
-/**
- * CDC process context.
- */
-public final class CDCProcessContext extends 
AbstractTransmissionProcessContext {
-    
-    public CDCProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
-        super(jobId, originalProcessConfig);
-    }
-}
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 7709cb847e9..4cb7e57ea51 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -21,15 +21,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.CDCJobOption;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCChannelProgressPair;
 import org.apache.shardingsphere.data.pipeline.cdc.core.importer.CDCImporter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCIncrementalTask;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCInventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
-import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
@@ -119,7 +119,7 @@ public final class CDCJobPreparer {
         long startTimeMillis = System.currentTimeMillis();
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         ImporterConfiguration importerConfig = taskConfig.getImporterConfig();
-        CDCProcessContext processContext = 
jobItemContext.getJobProcessContext();
+        TransmissionProcessContext processContext = 
jobItemContext.getJobProcessContext();
         for (InventoryDumperContext each : new 
InventoryTaskSplitter(jobItemContext.getSourceDataSource(), new 
InventoryDumperContext(taskConfig.getDumperContext().getCommonContext()), 
importerConfig)
                 .splitInventoryDumperContext(jobItemContext)) {
             AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index a15cb32f87b..298a0a7706a 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -105,8 +105,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
             try {
-                PipelineDataConsistencyChecker checker = 
jobOption.buildDataConsistencyChecker(
-                        parentJobConfig, 
jobOption.buildProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
+                PipelineDataConsistencyChecker checker = 
jobOption.buildDataConsistencyChecker(parentJobConfig, 
jobOption.buildProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
                 consistencyChecker.set(checker);
                 Map<String, TableDataConsistencyCheckResult> checkResultMap = 
checker.check(checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
                 log.info("job {} with check algorithm '{}' data consistency 
checker result: {}, stopping: {}",
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 63fdd02f23d..b92fc6eb22f 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfigurati
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
@@ -44,7 +45,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
@@ -83,7 +83,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
         int shardingItem = shardingContext.getShardingItem();
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
-        MigrationProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
+        TransmissionProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
     }
@@ -120,7 +120,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
     private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
                                                              final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
MigrationProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
         return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 03b8f537bf4..5e70e0a5360 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobM
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -76,9 +75,9 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
     }
     
     @Override
-    public MigrationProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
+    public TransmissionProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
         PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        return new MigrationProcessContext(jobConfig.getJobId(), 
processConfig);
+        return new TransmissionProcessContext(jobConfig.getJobId(), 
processConfig);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
index dbffa7cb409..c2a6bb9d2a9 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationJobItemContext.java
@@ -22,13 +22,14 @@ import lombok.Setter;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
@@ -72,7 +73,7 @@ public final class MigrationJobItemContext implements 
TransmissionJobItemContext
     
     private final MigrationJobConfiguration jobConfig;
     
-    private final MigrationProcessContext jobProcessContext;
+    private final TransmissionProcessContext jobProcessContext;
     
     private final PipelineDataSourceManager dataSourceManager;
     
@@ -93,7 +94,7 @@ public final class MigrationJobItemContext implements 
TransmissionJobItemContext
     };
     
     public MigrationJobItemContext(final MigrationJobConfiguration jobConfig, 
final int shardingItem, final TransmissionJobItemProgress initProgress,
-                                   final MigrationProcessContext 
jobProcessContext, final MigrationTaskConfiguration taskConfig, final 
PipelineDataSourceManager dataSourceManager) {
+                                   final TransmissionProcessContext 
jobProcessContext, final MigrationTaskConfiguration taskConfig, final 
PipelineDataSourceManager dataSourceManager) {
         this.jobConfig = jobConfig;
         jobId = jobConfig.getJobId();
         this.shardingItem = shardingItem;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
deleted file mode 100644
index a382a4ca5cc..00000000000
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/context/MigrationProcessContext.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.scenario.migration.context;
-
-import lombok.Getter;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.context.AbstractTransmissionProcessContext;
-
-/**
- * Migration process context.
- */
-@Getter
-public final class MigrationProcessContext extends 
AbstractTransmissionProcessContext {
-    
-    /**
-     * Constructor.
-     *
-     * @param jobId job id
-     * @param originalProcessConfig original process configuration, nullable
-     */
-    public MigrationProcessContext(final String jobId, final 
PipelineProcessConfiguration originalProcessConfig) {
-        super(jobId, originalProcessConfig);
-    }
-}
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index efa91b9cae2..5adb0e55c46 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swappe
 import org.apache.shardingsphere.data.pipeline.common.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
@@ -44,7 +45,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
 import 
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
@@ -181,7 +181,7 @@ public final class PipelineContextUtils {
      */
     public static MigrationJobItemContext mockMigrationJobItemContext(final 
MigrationJobConfiguration jobConfig) {
         PipelineProcessConfiguration processConfig = 
mockPipelineProcessConfiguration();
-        MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), processConfig);
+        TransmissionProcessContext processContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         int jobShardingItem = 0;
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
         return new MigrationJobItemContext(jobConfig, jobShardingItem, null, 
processContext, taskConfig, new DefaultPipelineDataSourceManager());
@@ -224,7 +224,7 @@ public final class PipelineContextUtils {
     private static ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
                                                                     final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
MigrationProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
         return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index bc0d65ba749..d187c1d4713 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.check
 
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
@@ -28,9 +29,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -64,7 +64,7 @@ class MigrationDataConsistencyCheckerTest {
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey());
         
getClusterPersistRepository().persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         
governanceFacade.getJobItemFacade().getProcess().persist(jobConfig.getJobId(), 
0, "");
-        Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
+        Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
TransmissionProcessContext(jobConfig.getJobId(), null),
                 
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
         String checkKey = "t_order";
         assertTrue(actual.get(checkKey).isMatched());


Reply via email to