This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6b3573552be Refactor InventoryDumper (#32691)
6b3573552be is described below
commit 6b3573552be9d11e896bcbb2dbb4bcd577bae31f
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Aug 26 23:25:11 2024 +0800
Refactor InventoryDumper (#32691)
---
.../ingest/dumper/inventory/InventoryDumper.java | 20 ++++++++------------
1 file changed, 8 insertions(+), 12 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
index 030ab4835e2..4c9fdb0ba33 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/inventory/InventoryDumper.java
@@ -18,8 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory;
import com.google.common.base.Strings;
-import lombok.AccessLevel;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
@@ -72,7 +70,6 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class InventoryDumper extends AbstractPipelineLifecycleRunnable
implements Dumper {
- @Getter(AccessLevel.PROTECTED)
private final InventoryDumperContext dumperContext;
private final PipelineChannel channel;
@@ -81,7 +78,7 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
private final PipelineTableMetaDataLoader metaDataLoader;
- private final PipelineInventoryDumpSQLBuilder inventoryDumpSQLBuilder;
+ private final PipelineInventoryDumpSQLBuilder sqlBuilder;
private final InventoryColumnValueReaderEngine columnValueReaderEngine;
@@ -95,7 +92,7 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
this.dataSource = dataSource;
this.metaDataLoader = metaDataLoader;
DatabaseType databaseType =
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
- inventoryDumpSQLBuilder = new
PipelineInventoryDumpSQLBuilder(databaseType);
+ sqlBuilder = new PipelineInventoryDumpSQLBuilder(databaseType);
columnValueReaderEngine = new
InventoryColumnValueReaderEngine(databaseType);
}
@@ -231,7 +228,8 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
private DataRecord loadDataRecord(final ResultSet resultSet, final
ResultSetMetaData resultSetMetaData) throws SQLException {
int columnCount = resultSetMetaData.getColumnCount();
- DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
dumperContext.getLogicTableName(), newDataRecordPosition(resultSet),
columnCount);
+ String tableName = dumperContext.getLogicTableName();
+ DataRecord result = new DataRecord(PipelineSQLOperationType.INSERT,
tableName, newDataRecordPosition(resultSet), columnCount);
List<String> insertColumnNames =
Optional.ofNullable(dumperContext.getInsertColumnNames()).orElse(Collections.emptyList());
ShardingSpherePreconditions.checkState(insertColumnNames.isEmpty() ||
insertColumnNames.size() == resultSetMetaData.getColumnCount(),
() -> new PipelineInvalidParameterException("Insert column
names count not equals ResultSet column count"));
@@ -256,17 +254,15 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
PipelineColumnMetaData firstColumn =
dumperContext.getUniqueKeyColumns().get(0);
List<String> columnNames = dumperContext.getQueryColumnNames();
if (QueryType.POINT_QUERY == queryParam.getQueryType()) {
- return inventoryDumpSQLBuilder.buildPointQuerySQL(schemaName,
dumperContext.getActualTableName(), columnNames, firstColumn.getName());
+ return sqlBuilder.buildPointQuerySQL(schemaName,
dumperContext.getActualTableName(), columnNames, firstColumn.getName());
}
QueryRange queryRange = queryParam.getUniqueKeyValueRange();
boolean lowerInclusive = queryRange.isLowerInclusive();
if (null != queryRange.getLower() && null != queryRange.getUpper()) {
- return inventoryDumpSQLBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(
- schemaName, dumperContext.getActualTableName(),
columnNames, firstColumn.getName(), lowerInclusive, true));
+ return sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(),
columnNames, firstColumn.getName(), lowerInclusive, true));
}
if (null != queryRange.getLower()) {
- return inventoryDumpSQLBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(
- schemaName, dumperContext.getActualTableName(),
columnNames, firstColumn.getName(), lowerInclusive, false));
+ return sqlBuilder.buildDivisibleSQL(new
BuildDivisibleSQLParameter(schemaName, dumperContext.getActualTableName(),
columnNames, firstColumn.getName(), lowerInclusive, false));
}
throw new PipelineInternalException("Primary key position is
invalid.");
}
@@ -323,7 +319,7 @@ public class InventoryDumper extends
AbstractPipelineLifecycleRunnable implement
}
String schemaName =
dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
List<String> columnNames = dumperContext.getQueryColumnNames();
- return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName(), columnNames);
+ return sqlBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName(), columnNames);
}
@Override