This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7fad404dfb4 Refactor InventoryDumper (#32692)
7fad404dfb4 is described below
commit 7fad404dfb4799dbf7d9f41a6f420698ad71d872
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 27 00:31:50 2024 +0800
Refactor InventoryDumper (#32692)
---
.../ingest/dumper/inventory/InventoryDumper.java | 19 +++++------
.../InventoryDataRecordPositionCreator.java | 37 ++++++++++++++++++++
...ceholderInventoryDataRecordPositionCreator.java | 36 ++++++++++++++++++++
...niqueKeyInventoryDataRecordPositionCreator.java | 39 ++++++++++++++++++++++
.../inventory/splitter/InventoryTaskSplitter.java | 6 +++-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 6 +++-
6 files changed, 130 insertions(+), 13 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 4c9fdb0ba33..414d4820a7c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
import
org.apache.shardingsphere.data.pipeline.core.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.column.InventoryColumnValueReaderEngine;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
@@ -68,7 +68,7 @@ import java.util.concurrent.atomic.AtomicReference;
*/
@HighFrequencyInvocation
@Slf4j
-public class InventoryDumper extends AbstractPipelineLifecycleRunnable
implements Dumper {
+public final class InventoryDumper extends AbstractPipelineLifecycleRunnable
implements Dumper {
private final InventoryDumperContext dumperContext;
@@ -78,6 +78,8 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
private final PipelineTableMetaDataLoader metaDataLoader;
+ private final InventoryDataRecordPositionCreator positionCreator;
+
private final PipelineInventoryDumpSQLBuilder sqlBuilder;
private final InventoryColumnValueReaderEngine columnValueReaderEngine;
@@ -86,11 +88,13 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
private PipelineTableMetaData tableMetaData;
- public InventoryDumper(final InventoryDumperContext dumperContext, final
PipelineChannel channel, final DataSource dataSource, final
PipelineTableMetaDataLoader metaDataLoader) {
+ public InventoryDumper(final InventoryDumperContext dumperContext, final
PipelineChannel channel, final DataSource dataSource,
+ final PipelineTableMetaDataLoader metaDataLoader,
final InventoryDataRecordPositionCreator positionCreator) {
this.dumperContext = dumperContext;
this.channel = channel;
this.dataSource = dataSource;
this.metaDataLoader = metaDataLoader;
+ this.positionCreator = positionCreator;
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new
InventoryColumnValueReaderEngine(databaseType);
@@ -229,7 +233,7 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData) throws SQLException {
int columnCount = resultSetMetaData.getColumnCount();
String tableName = dumperContext.getLogicTableName();
- DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, newDataRecordPosition(resultSet), columnCount);
+ DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, positionCreator.create(dumperContext, resultSet), columnCount);
List<String> insertColumnNames =
Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() ||
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
() -> new PipelineInvalidParameterException("Insert column
names count not equals ResultSet column count"));
@@ -242,13 +246,6 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
return result;
}
- protected IngestPosition newDataRecordPosition(final ResultSet resultSet)
throws SQLException {
- return dumperContext.hasUniqueKey()
- ? PrimaryKeyIngestPositionFactory.newInstance(
-
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
((PrimaryKeyIngestPosition<?>)
dumperContext.getCommonContext().getPosition()).getEndValue())
- : new IngestPlaceholderPosition();
- }
-
private String buildInventoryDumpPageByPageSQL(final
InventoryQueryParameter queryParam) {
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
PipelineColumnMetaData firstColumn =
dumperContext.getUniqueKeyColumns().get(0);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/InventoryDataRecordPositionCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/InventoryDataRecordPositionCreator.java
new file mode 100644
index 00000000000..5f0d6682b85
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/InventoryDataRecordPositionCreator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public interface InventoryDataRecordPositionCreator {
+
+ /**
+ * Create inventory data record position.
+ *
+ * @param dumperContext dumper context
+ * @param resultSet result set
+ * @return created position
+ * @throws SQLException SQL exception
+ */
+ IngestPosition create(InventoryDumperContext dumperContext, ResultSet
resultSet) throws SQLException;
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/PlaceholderInventoryDataRecordPositionCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/PlaceholderInventoryDataRecordPositionCreator.java
new file mode 100644
index 00000000000..53a109db3bb
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/PlaceholderInventoryDataRecordPositionCreator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.placeholder.IngestPlaceholderPosition;
+
+import java.sql.ResultSet;
+
+/**
+ * Placeholder inventory data record position creator.
+ */
+public final class PlaceholderInventoryDataRecordPositionCreator implements
InventoryDataRecordPositionCreator {
+
+ @Override
+ public IngestPosition create(final InventoryDumperContext dumperContext,
final ResultSet resultSet) {
+ return new IngestPlaceholderPosition();
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
new file mode 100644
index 00000000000..a4d2fcf032e
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/position/type/UniqueKeyInventoryDataRecordPositionCreator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type;
+
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * Unique key inventory data record position creator.
+ */
+public final class UniqueKeyInventoryDataRecordPositionCreator implements
InventoryDataRecordPositionCreator {
+
+ @Override
+ public IngestPosition create(final InventoryDumperContext dumperContext,
final ResultSet resultSet) throws SQLException {
+ return PrimaryKeyIngestPositionFactory.newInstance(
+
resultSet.getObject(dumperContext.getUniqueKeyColumns().get(0).getName()),
((PrimaryKeyIngestPosition<?>)
dumperContext.getCommonContext().getPosition()).getEndValue());
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
index 034081e5203..b646d01ae8d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryTaskSplitter.java
@@ -30,6 +30,9 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsum
import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.PlaceholderInventoryDataRecordPositionCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.UniqueKeyInventoryDataRecordPositionCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -65,7 +68,8 @@ public final class InventoryTaskSplitter {
for (InventoryDumperContext each :
dumperContextSplitter.split(jobItemContext)) {
AtomicReference<IngestPosition> position = new
AtomicReference<>(each.getCommonContext().getPosition());
PipelineChannel channel =
InventoryChannelCreator.create(processContext.getProcessConfiguration().getStreamChannel(),
importerConfig.getBatchSize(), position);
- Dumper dumper = new InventoryDumper(each, channel,
sourceDataSource, jobItemContext.getSourceMetaDataLoader());
+ InventoryDataRecordPositionCreator positionCreator =
each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new
PlaceholderInventoryDataRecordPositionCreator();
+ Dumper dumper = new InventoryDumper(each, channel,
sourceDataSource, jobItemContext.getSourceMetaDataLoader(), positionCreator);
Importer importer = new SingleChannelConsumerImporter(channel,
importerConfig.getBatchSize(), 3000L, jobItemContext.getSink(), jobItemContext);
result.add(new
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(), dumper, importer,
position));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index dec0fd192d2..a51a4cbc0df 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -38,6 +38,9 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.InventoryDataRecordPositionCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.PlaceholderInventoryDataRecordPositionCreator;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.position.type.UniqueKeyInventoryDataRecordPositionCreator;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
@@ -125,7 +128,8 @@ public final class CDCJobPreparer {
if (!(position.get() instanceof IngestFinishedPosition)) {
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
}
- Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
+ InventoryDataRecordPositionCreator positionCreator =
each.hasUniqueKey() ? new UniqueKeyInventoryDataRecordPositionCreator() : new
PlaceholderInventoryDataRecordPositionCreator();
+ Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader(),
positionCreator);
Importer importer = importerUsed.get() ? null
: new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100L, jobItemContext.getSink(), false,
importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),