This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fad404dfb4 Refactor InventoryDumper (#32692)
7fad404dfb4 is described below

commit 7fad404dfb4799dbf7d9f41a6f420698ad71d872
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 27 00:31:50 2024 +0800

    Refactor InventoryDumper (#32692)
---
 .../ingest/dumper/inventory/InventoryDumper.java   | 19 +++++------
 .../InventoryDataRecordPositionCreator.java        | 37 ++++++++++++++++++++
 ...ceholderInventoryDataRecordPositionCreator.java | 36 ++++++++++++++++++++
 ...niqueKeyInventoryDataRecordPositionCreator.java | 39 ++++++++++++++++++++++
 .../inventory/splitter/InventoryTaskSplitter.java  |  6 +++-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  6 +++-
 6 files changed, 130 insertions(+), 13 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 4c9fdb0ba33..414d4820a7c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
 import 
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
@@ -68,7 +68,7 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 @HighFrequencyInvocation
 @Slf4j
-public class InventoryDumper extends AbstractPipelineLifecycleRunnable 
implements Dumper {
+public final class InventoryDumper extends AbstractPipelineLifecycleRunnable 
implements Dumper {
     
     private final InventoryDumperContext dumperContext;
     
@@ -78,6 +78,8 @@ public class InventoryDumper extends 
AbstractPipelineLifecycleRunnable implement
     
     private final PipelineTableMetaDataLoader metaDataLoader;
     
+    private final InventoryDataRecordPositionCreator positionCreator;
+    
     private final PipelineInventoryDumpSQLBuilder sqlBuilder;
     
     private final InventoryColumnValueReaderEngine columnValueReaderEngine;
@@ -86,11 +88,13 @@ public class InventoryDumper extends 
AbstractPipelineLifecycleRunnable implement
     
     private PipelineTableMetaData tableMetaData;
     
-    public InventoryDumper(final InventoryDumperContext dumperContext, final 
PipelineChannel channel, final DataSource dataSource, final 
PipelineTableMetaDataLoader metaDataLoader) {
+    public InventoryDumper(final InventoryDumperContext dumperContext, final 
PipelineChannel channel, final DataSource dataSource,
+                           final PipelineTableMetaDataLoader metaDataLoader, 
final InventoryDataRecordPositionCreator positionCreator) {
         this.dumperContext = dumperContext;
         this.channel = channel;
         this.dataSource = dataSource;
         this.metaDataLoader = metaDataLoader;
+        this.positionCreator = positionCreator;
         DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
         columnValueReaderEngine = new 
InventoryColumnValueReaderEngine(databaseType);
@@ -229,7 +233,7 @@ public class InventoryDumper extends 
AbstractPipelineLifecycleRunnable implement
     private DataRecord loadDataRecord(final ResultSet resultSet, final 
ResultSetMetaData resultSetMetaData) throws SQLException {
         int columnCount = resultSetMetaData.getColumnCount();
         String tableName = dumperContext.getLogicTableName();
-        DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, newDataRecordPosition(resultSet), columnCount);
+        DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT, 
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
         List<String> insertColumnNames = 
Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
         ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() || 
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
                 () -> new PipelineInvalidParameterException("Insert column 
names count not equals ResultSet column count"));
@@ -242,13 +246,6 @@ public class InventoryDumper extends 
AbstractPipelineLifecycleRunnable implement
         return result;
     }
     
-    protected IngestPosition newDataRecordPosition(final ResultSet resultSet) 
throws SQLException {
-        return dumperContext.hasUniqueKey()
-                ? PrimaryKeyIngestPositionFactory.newInstance(
-                        
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), 
((PrimaryKeyIngestPosition<?>) 
dumperContext.getCommonContext().getPosition()).getEndValue())
-                : new IngestPlaceholderPosition();
-    }
-    
     private String buildInventoryDumpPageByPageSQL(final 
InventoryQueryParameter queryParam) {
         String schemaName = 
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         PipelineColumnMetaData firstColumn = 
dumperContext.getUniqueKeyColumns().get(0);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/InventoryDataRecordPositionCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/InventoryDataRecordPositionCreator.java
new file mode 100644
index 00000000000..5f0d6682b85
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/InventoryDataRecordPositionCreator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public interface InventoryDataRecordPositionCreator {
+    
+    /**
+     * Create inventory data record position.
+     *
+     * @param dumperContext dumper context
+     * @param resultSet result set
+     * @return created position
+     * @throws SQLException SQL exception
+     */
+    IngestPosition create(InventoryDumperContext dumperContext, ResultSet 
resultSet) throws SQLException;
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/PlaceholderInventoryDataRecordPositionCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/PlaceholderInventoryDataRecordPositionCreator.java
new file mode 100644
index 00000000000..53a109db3bb
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/PlaceholderInventoryDataRecordPositionCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+
+import java.sql.ResultSet;
+
+/**
+ * Placeholder inventory data record position creator.
+ */
+public final class PlaceholderInventoryDataRecordPositionCreator implements 
InventoryDataRecordPositionCreator {
+    
+    @Override
+    public IngestPosition create(final InventoryDumperContext dumperContext, 
final ResultSet resultSet) {
+        return new IngestPlaceholderPosition();
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
new file mode 100644
index 00000000000..a4d2fcf032e
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type;
+
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Unique key inventory data record position creator.
+ */
+public final class UniqueKeyInventoryDataRecordPositionCreator implements 
InventoryDataRecordPositionCreator {
+    
+    @Override
+    public IngestPosition create(final InventoryDumperContext dumperContext, 
final ResultSet resultSet) throws SQLException {
+        return PrimaryKeyIngestPositionFactory.newInstance(
+                
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()), 
((PrimaryKeyIngestPosition<?>) 
dumperContext.getCommonContext().getPosition()).getEndValue());
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index 034081e5203..b646d01ae8d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -30,6 +30,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsum
 import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.PlaceholderInventoryDataRecordPositionCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.UniqueKeyInventoryDataRecordPositionCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -65,7 +68,8 @@ public final class InventoryTaskSplitter {
         for (InventoryDumperContext each : 
dumperContextSplitter.split(jobItemContext)) {
             AtomicReference<IngestPosition> position = new 
AtomicReference<>(each.getCommonContext().getPosition());
             PipelineChannel channel = 
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
 importerConfig.getBatchSize(), position);
-            Dumper dumper = new InventoryDumper(each, channel, 
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
+            InventoryDataRecordPositionCreator positionCreator = 
each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new 
PlaceholderInventoryDataRecordPositionCreator();
+            Dumper dumper = new InventoryDumper(each, channel, 
sourceDataSource, jobItemContext.getSourceMetaDataLoader(), positionCreator);
             Importer importer = new SingleChannelConsumerImporter(channel, 
importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
             result.add(new 
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
                     processContext.getInventoryDumperExecuteEngine(), 
processContext.getInventoryImporterExecuteEngine(), dumper, importer, 
position));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index dec0fd192d2..a51a4cbc0df 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -38,6 +38,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.PlaceholderInventoryDataRecordPositionCreator;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.UniqueKeyInventoryDataRecordPositionCreator;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
@@ -125,7 +128,8 @@ public final class CDCJobPreparer {
             if (!(position.get() instanceof IngestFinishedPosition)) {
                 channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
             }
-            Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
+            InventoryDataRecordPositionCreator positionCreator = 
each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new 
PlaceholderInventoryDataRecordPositionCreator();
+            Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader(), 
positionCreator);
             Importer importer = importerUsed.get() ? null
                     : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false, 
importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),

Reply via email to