This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new acca6b72b4e Rename IncrementalTaskPositionManager (#29476)
acca6b72b4e is described below
commit acca6b72b4e60bf43b791d886739ab572aa5a64a
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Dec 20 22:03:36 2023 +0800
Rename IncrementalTaskPositionManager (#29476)
* Rename DataSourceCheckEngine.checkSourceDataSources()
* Refactor DataSourceCheckEngine
* Rename IncrementalTaskPreparer
* Rename IncrementalTaskPositionManager
* Rename IncrementalTaskPositionManager
---
.../IncrementalTaskPositionManager.java} | 63 ++++++++++++----------
.../data/pipeline/cdc/api/CDCJobAPI.java | 8 +--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 11 ++--
.../migration/preparer/MigrationJobPreparer.java | 6 +--
4 files changed, 47 insertions(+), 41 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
similarity index 56%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index e2cd721a675..c935e7fac14 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -15,9 +15,8 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
@@ -34,67 +33,75 @@ import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCre
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.Optional;
/**
- * Pipeline job preparer.
+ * Incremental task position manager.
*/
-@RequiredArgsConstructor
@Slf4j
-public final class PipelineJobPreparer {
+public final class IncrementalTaskPositionManager {
private final DatabaseType databaseType;
+ private final PositionInitializer positionInitializer;
+
+ public IncrementalTaskPositionManager(final DatabaseType databaseType) {
+ this.databaseType = databaseType;
+ positionInitializer =
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
+ }
+
/**
- * Get incremental position.
+ * Get ingest position.
*
* @param initialProgress initial iob item incremental tasks progress
* @param dumperContext incremental dumper context
- * @param dataSourceManager data source manager
+ * @param dataSourceManager pipeline data source manager
* @return ingest position
* @throws SQLException SQL exception
*/
- public IngestPosition getIncrementalPosition(final
JobItemIncrementalTasksProgress initialProgress, final IncrementalDumperContext
dumperContext,
- final
PipelineDataSourceManager dataSourceManager) throws SQLException {
+ public IngestPosition getPosition(final JobItemIncrementalTasksProgress
initialProgress,
+ final IncrementalDumperContext
dumperContext, final PipelineDataSourceManager dataSourceManager) throws
SQLException {
if (null != initialProgress) {
Optional<IngestPosition> position =
initialProgress.getIncrementalPosition();
if (position.isPresent()) {
return position.get();
}
}
- DataSource dataSource =
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
- return DatabaseTypedSPILoader.getService(PositionInitializer.class,
databaseType).init(dataSource, dumperContext.getJobId());
+ return
positionInitializer.init(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig()),
dumperContext.getJobId());
}
/**
- * Cleanup job preparer.
+ * Destroy ingest position.
*
* @param jobId pipeline job id
- * @param pipelineDataSourceConfig pipeline data source config
+ * @param pipelineDataSourceConfig pipeline data source configuration
* @throws SQLException SQL exception
*/
public void destroyPosition(final String jobId, final
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
- PositionInitializer positionInitializer =
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
final long startTimeMillis = System.currentTimeMillis();
- log.info("Cleanup database type:{}, data source type:{}",
databaseType.getType(), pipelineDataSourceConfig.getType());
+ log.info("Cleanup position, database type: {}, pipeline data source
type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof
ShardingSpherePipelineDataSourceConfiguration) {
- ShardingSpherePipelineDataSourceConfiguration dataSourceConfig =
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig;
- for (DataSourcePoolProperties each : new
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(dataSourceConfig.getRootConfig()).values())
{
- try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
- positionInitializer.destroy(dataSource, jobId);
- }
- }
+ destroyPosition(jobId,
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig,
positionInitializer);
+ } else if (pipelineDataSourceConfig instanceof
StandardPipelineDataSourceConfiguration) {
+ destroyPosition(jobId, (StandardPipelineDataSourceConfiguration)
pipelineDataSourceConfig, positionInitializer);
}
- if (pipelineDataSourceConfig instanceof
StandardPipelineDataSourceConfiguration) {
- StandardPipelineDataSourceConfiguration dataSourceConfig =
(StandardPipelineDataSourceConfiguration) pipelineDataSourceConfig;
- try (
- PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(
-
DataSourcePoolCreator.create((DataSourcePoolProperties)
dataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+ log.info("destroyPosition cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ }
+
+ private void destroyPosition(final String jobId, final
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final
PositionInitializer positionInitializer) throws SQLException {
+ for (DataSourcePoolProperties each : new
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values())
{
+ try (PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(DataSourcePoolCreator.create(each), databaseType)) {
positionInitializer.destroy(dataSource, jobId);
}
}
- log.info("destroyPosition cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ }
+
+ private void destroyPosition(final String jobId, final
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig, final
PositionInitializer positionInitializer) throws SQLException {
+ try (
+ PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(
+
DataSourcePoolCreator.create((DataSourcePoolProperties)
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+ positionInitializer.destroy(dataSource, jobId);
+ }
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index a177080d867..59b51ab86c9 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -52,7 +52,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -201,8 +201,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
TransmissionJobItemProgress result = new TransmissionJobItemProgress();
result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
- IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(new PipelineJobPreparer(
-
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getIncrementalPosition(null,
incrementalDumperContext, dataSourceManager));
+ IncrementalTaskProgress incrementalTaskProgress = new
IncrementalTaskProgress(new IncrementalTaskPositionManager(
+
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getPosition(null,
incrementalDumperContext, dataSourceManager));
result.setIncremental(new
JobItemIncrementalTasksProgress(incrementalTaskProgress));
return result;
}
@@ -259,7 +259,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
for (Entry<String, Map<String, Object>> entry :
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
try {
StandardPipelineDataSourceConfiguration
pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(entry.getValue());
- new
PipelineJobPreparer(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(),
pipelineDataSourceConfig);
+ new
IncrementalTaskPositionManager(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(),
pipelineDataSourceConfig);
} catch (final SQLException ex) {
log.warn("job destroying failed, jobId={}, dataSourceName={}",
jobConfig.getJobId(), entry.getKey(), ex);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index c9da76d64ba..51f717ec8bf 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -29,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessC
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
@@ -41,7 +40,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -106,7 +105,7 @@ public final class CDCJobPreparer {
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
DatabaseType databaseType =
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
- IngestPosition position = new
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+ IngestPosition position = new
IncrementalTaskPositionManager(databaseType).getPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
@@ -128,7 +127,7 @@ public final class CDCJobPreparer {
Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS,
jobItemContext.getSink(),
- needSorting(ImporterType.INVENTORY,
hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
+
needSorting(hasGlobalCSN(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())),
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(),
dumper, importer, position));
@@ -139,8 +138,8 @@ public final class CDCJobPreparer {
log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() -
startTimeMillis);
}
- private boolean needSorting(final ImporterType importerType, final boolean
hasGlobalCSN) {
- return ImporterType.INCREMENTAL == importerType && hasGlobalCSN;
+ private boolean needSorting(final boolean hasGlobalCSN) {
+ return hasGlobalCSN;
}
private boolean hasGlobalCSN(final DatabaseType databaseType) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index c30eeb3aab5..edb1755ac87 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -46,7 +46,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
+import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
@@ -185,7 +185,7 @@ public final class MigrationJobPreparer {
JobItemIncrementalTasksProgress initIncremental = null ==
jobItemContext.getInitProgress() ? null :
jobItemContext.getInitProgress().getIncremental();
try {
DatabaseType databaseType =
taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType();
- IngestPosition position = new
PipelineJobPreparer(databaseType).getIncrementalPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
+ IngestPosition position = new
IncrementalTaskPositionManager(databaseType).getPosition(initIncremental,
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager());
taskConfig.getDumperContext().getCommonContext().setPosition(position);
} catch (final SQLException ex) {
throw new
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
@@ -232,7 +232,7 @@ public final class MigrationJobPreparer {
public void cleanup(final MigrationJobConfiguration jobConfig) {
for (Entry<String, PipelineDataSourceConfiguration> entry :
jobConfig.getSources().entrySet()) {
try {
- new
PipelineJobPreparer(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
entry.getValue());
+ new
IncrementalTaskPositionManager(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
entry.getValue());
} catch (final SQLException ex) {
log.warn("job destroying failed, jobId={}, dataSourceName={}",
jobConfig.getJobId(), entry.getKey(), ex);
}