This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new acca6b72b4e Rename IncrementalTaskPositionManager (#29476)
acca6b72b4e is described below

commit acca6b72b4e60bf43b791d886739ab572aa5a64a
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 22:03:36 2023 +0800

    Rename IncrementalTaskPositionManager (#29476)
    
    * Rename DataSourceCheckEngine.checkSourceDataSources()
    
    * Refactor DataSourceCheckEngine
    
    * Rename IncrementalTaskPreparer
    
    * Rename IncrementalTaskPositionManager
    
    * Rename IncrementalTaskPositionManager
---
 .../IncrementalTaskPositionManager.java}           | 63 ++++++++++++----------
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  8 +--
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  | 11 ++--
 .../migration/preparer/MigrationJobPreparer.java   |  6 +--
 4 files changed, 47 insertions(+), 41 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
similarity index 56%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index e2cd721a675..c935e7fac14 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
@@ -34,67 +33,75 @@ import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCre
 import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 
-import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.util.Optional;
 
 /**
- * Pipeline job preparer.
+ * Incremental task position manager.
  */
-@RequiredArgsConstructor
 @Slf4j
-public final class PipelineJobPreparer {
+public final class IncrementalTaskPositionManager {
     
     private final DatabaseType databaseType;
     
+    private final PositionInitializer positionInitializer;
+    
+    public IncrementalTaskPositionManager(final DatabaseType databaseType) {
+        this.databaseType = databaseType;
+        positionInitializer = 
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
+    }
+    
     /**
-     * Get incremental position.
+     * Get ingest position.
      *
      * @param initialProgress initial iob item incremental tasks progress
      * @param dumperContext incremental dumper context
-     * @param dataSourceManager data source manager
+     * @param dataSourceManager pipeline data source manager
      * @return ingest position
      * @throws SQLException SQL exception
      */
-    public IngestPosition getIncrementalPosition(final 
JobItemIncrementalTasksProgress initialProgress, final IncrementalDumperContext 
dumperContext,
-                                                 final 
PipelineDataSourceManager dataSourceManager) throws SQLException {
+    public IngestPosition getPosition(final JobItemIncrementalTasksProgress 
initialProgress,
+                                      final IncrementalDumperContext 
dumperContext, final PipelineDataSourceManager dataSourceManager) throws 
SQLException {
         if (null != initialProgress) {
             Optional<IngestPosition> position = 
initialProgress.getIncrementalPosition();
             if (position.isPresent()) {
                 return position.get();
             }
         }
-        DataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
-        return DatabaseTypedSPILoader.getService(PositionInitializer.class, 
databaseType).init(dataSource, dumperContext.getJobId());
+        return 
positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()),
 dumperContext.getJobId());
     }
     
     /**
-     * Cleanup job preparer.
+     * Destroy ingest position.
      *
      * @param jobId pipeline job id
-     * @param pipelineDataSourceConfig pipeline data source config
+     * @param pipelineDataSourceConfig pipeline data source configuration
      * @throws SQLException SQL exception
      */
     public void destroyPosition(final String jobId, final 
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
-        PositionInitializer positionInitializer = 
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
         final long startTimeMillis = System.currentTimeMillis();
-        log.info("Cleanup database type:{}, data source type:{}", 
databaseType.getType(), pipelineDataSourceConfig.getType());
+        log.info("Cleanup position, database type: {}, pipeline data source 
type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
         if (pipelineDataSourceConfig instanceof 
ShardingSpherePipelineDataSourceConfiguration) {
-            ShardingSpherePipelineDataSourceConfiguration dataSourceConfig = 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
-            for (DataSourcePoolProperties each : new 
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(dataSourceConfig.getRootConfig()).values())
 {
-                try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
-                    positionInitializer.destroy(dataSource, jobId);
-                }
-            }
+            destroyPosition(jobId, 
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig, 
positionInitializer);
+        } else if (pipelineDataSourceConfig instanceof 
StandardPipelineDataSourceConfiguration) {
+            destroyPosition(jobId, (StandardPipelineDataSourceConfiguration) 
pipelineDataSourceConfig, positionInitializer);
         }
-        if (pipelineDataSourceConfig instanceof 
StandardPipelineDataSourceConfiguration) {
-            StandardPipelineDataSourceConfiguration dataSourceConfig = 
(StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig;
-            try (
-                    PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(
-                            
DataSourcePoolCreator.create((DataSourcePoolProperties) 
dataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+        log.info("destroyPosition cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
+    }
+    
+    private void destroyPosition(final String jobId, final 
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final 
PositionInitializer positionInitializer) throws SQLException {
+        for (DataSourcePoolProperties each : new 
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values())
 {
+            try (PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
                 positionInitializer.destroy(dataSource, jobId);
             }
         }
-        log.info("destroyPosition cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
+    }
+    
+    private void destroyPosition(final String jobId, final 
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig, final 
PositionInitializer positionInitializer) throws SQLException {
+        try (
+                PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(
+                        
DataSourcePoolCreator.create((DataSourcePoolProperties) 
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+            positionInitializer.destroy(dataSource, jobId);
+        }
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index a177080d867..59b51ab86c9 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -52,7 +52,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -201,8 +201,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         TransmissionJobItemProgress result = new TransmissionJobItemProgress();
         result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
         
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
-        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(new PipelineJobPreparer(
-                
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getIncrementalPosition(null,
 incrementalDumperContext, dataSourceManager));
+        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(new IncrementalTaskPositionManager(
+                
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null,
 incrementalDumperContext, dataSourceManager));
         result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
         return result;
     }
@@ -259,7 +259,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         for (Entry<String, Map<String, Object>> entry : 
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
             try {
                 StandardPipelineDataSourceConfiguration 
pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(entry.getValue());
-                new 
PipelineJobPreparer(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(),
 pipelineDataSourceConfig);
+                new 
IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(),
 pipelineDataSourceConfig);
             } catch (final SQLException ex) {
                 log.warn("job destroying failed, jobId={}, dataSourceName={}", 
jobConfig.getJobId(), entry.getKey(), ex);
             }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index c9da76d64ba..51f717ec8bf 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -29,7 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
@@ -41,7 +40,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -106,7 +105,7 @@ public final class CDCJobPreparer {
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
             DatabaseType databaseType = 
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
-            IngestPosition position = new 
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+            IngestPosition position = new 
IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
             
taskConfig.getDumperContext().getCommonContext().setPosition(position);
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
@@ -128,7 +127,7 @@ public final class CDCJobPreparer {
             Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
             Importer importer = importerUsed.get() ? null
                     : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, 
jobItemContext.getSink(),
-                            needSorting(ImporterType.INVENTORY, 
hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
+                            
needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
                             importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
                     processContext.getInventoryImporterExecuteEngine(), 
dumper, importer, position));
@@ -139,8 +138,8 @@ public final class CDCJobPreparer {
         log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
     }
     
-    private boolean needSorting(final ImporterType importerType, final boolean 
hasGlobalCSN) {
-        return ImporterType.INCREMENTAL == importerType && hasGlobalCSN;
+    private boolean needSorting(final boolean hasGlobalCSN) {
+        return hasGlobalCSN;
     }
     
     private boolean hasGlobalCSN(final DatabaseType databaseType) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index c30eeb3aab5..edb1755ac87 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -46,7 +46,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
@@ -185,7 +185,7 @@ public final class MigrationJobPreparer {
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
             DatabaseType databaseType = 
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
-            IngestPosition position = new 
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+            IngestPosition position = new 
IncrementalTaskPositionManager(databaseType).getPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
             
taskConfig.getDumperContext().getCommonContext().setPosition(position);
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
@@ -232,7 +232,7 @@ public final class MigrationJobPreparer {
     public void cleanup(final MigrationJobConfiguration jobConfig) {
         for (Entry<String, PipelineDataSourceConfiguration> entry : 
jobConfig.getSources().entrySet()) {
             try {
-                new 
PipelineJobPreparer(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
 entry.getValue());
+                new 
IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
 entry.getValue());
             } catch (final SQLException ex) {
                 log.warn("job destroying failed, jobId={}, dataSourceName={}", 
jobConfig.getJobId(), entry.getKey(), ex);
             }

Reply via email to