This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e664f0e4deb Refactor DumperConfiguration (#28921)
e664f0e4deb is described below
commit e664f0e4deb455e73341ebc478995eb34598eeb7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 2 23:08:30 2023 +0800
Refactor DumperConfiguration (#28921)
* Refactor DumperConfiguration
* Refactor DumperConfiguration
* Refactor DumperConfiguration
* Refactor DumperConfiguration
---
.../api/config/ingest/DumperConfiguration.java | 27 ++++++++++------------
.../PipelineInventoryDumpSQLBuilder.java | 10 ++++----
.../data/pipeline/core/dumper/InventoryDumper.java | 3 ++-
.../mysql/ingest/MySQLIncrementalDumper.java | 18 +++++++--------
.../postgresql/ingest/wal/WALEventConverter.java | 10 ++++----
.../ingest/wal/WALEventConverterTest.java | 6 ++---
6 files changed, 36 insertions(+), 38 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 474ea9cc8be..4d8c81f83a8 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -28,10 +28,10 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -58,7 +58,7 @@ public class DumperConfiguration {
// LinkedHashSet is required
@Getter(AccessLevel.PROTECTED)
- private Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap;
+ private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap
= new HashMap<>();
private boolean decodeWithTX;
@@ -112,22 +112,19 @@ public class DumperConfiguration {
* @param logicTableName logic table name
* @return column names
*/
- public Optional<List<String>> getColumnNames(final LogicTableName
logicTableName) {
- Set<ColumnName> columnNames = null == targetTableColumnsMap ? null :
targetTableColumnsMap.get(logicTableName);
- if (null == columnNames) {
- return Optional.empty();
- }
- return
Optional.of(columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList()));
+ public Collection<String> getColumnNames(final LogicTableName
logicTableName) {
+ return targetTableColumnsMap.containsKey(logicTableName)
+ ?
targetTableColumnsMap.get(logicTableName).stream().map(ColumnName::getOriginal).collect(Collectors.toList())
+ : Collections.singleton("*");
}
/**
- * Get column name set of table.
+ * Get column names.
*
* @param actualTableName actual table name
- * @return column names of table
+ * @return column names
*/
- public Optional<Set<ColumnName>> getColumnNameSet(final String
actualTableName) {
- Set<ColumnName> result = null == targetTableColumnsMap ? null :
targetTableColumnsMap.get(getLogicTableName(actualTableName));
- return Optional.ofNullable(result);
+ public Collection<ColumnName> getColumnNames(final String actualTableName)
{
+ return
targetTableColumnsMap.getOrDefault(getLogicTableName(actualTableName),
Collections.emptySet());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
index 81a9711856a..395d98cacf2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import java.util.List;
+import java.util.Collection;
import java.util.stream.Collectors;
/**
@@ -42,7 +42,7 @@ public final class PipelineInventoryDumpSQLBuilder {
* @param uniqueKey unique key
* @return built SQL
*/
- public String buildDivisibleSQL(final String schemaName, final String
tableName, final List<String> columnNames, final String uniqueKey) {
+ public String buildDivisibleSQL(final String schemaName, final String
tableName, final Collection<String> columnNames, final String uniqueKey) {
String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY
%s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey,
escapedUniqueKey, escapedUniqueKey);
@@ -57,7 +57,7 @@ public final class PipelineInventoryDumpSQLBuilder {
* @param uniqueKey unique key
* @return built SQL
*/
- public String buildUnlimitedDivisibleSQL(final String schemaName, final
String tableName, final List<String> columnNames, final String uniqueKey) {
+ public String buildUnlimitedDivisibleSQL(final String schemaName, final
String tableName, final Collection<String> columnNames, final String uniqueKey)
{
String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
String escapedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey,
escapedUniqueKey);
@@ -72,13 +72,13 @@ public final class PipelineInventoryDumpSQLBuilder {
* @param uniqueKey unique key
* @return built SQL
*/
- public String buildIndivisibleSQL(final String schemaName, final String
tableName, final List<String> columnNames, final String uniqueKey) {
+ public String buildIndivisibleSQL(final String schemaName, final String
tableName, final Collection<String> columnNames, final String uniqueKey) {
String qualifiedTableName =
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
String quotedUniqueKey =
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
return String.format("SELECT %s FROM %s ORDER BY %s ASC",
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
}
- private String buildQueryColumns(final List<String> columnNames) {
+ private String buildQueryColumns(final Collection<String> columnNames) {
return
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 31e80d750d5..23462cdf1db 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -57,6 +57,7 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -162,7 +163,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
}
PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>)
dumperConfig.getPosition();
PipelineColumnMetaData firstColumn =
dumperConfig.getUniqueKeyColumns().get(0);
- List<String> columnNames =
dumperConfig.getColumnNames(logicTableName).orElse(Collections.singletonList("*"));
+ Collection<String> columnNames =
dumperConfig.getColumnNames(logicTableName);
if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) ||
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
if (null != primaryKeyPosition.getBeginValue() && null !=
primaryKeyPosition.getEndValue()) {
return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName,
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 6c5cde7993a..c41f2ae9e33 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -55,12 +55,12 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.io.Serializable;
import java.nio.charset.Charset;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
/**
* MySQL incremental dumper.
@@ -161,13 +161,13 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event,
final PipelineTableMetaData tableMetaData) {
- Set<ColumnName> columnNameSet =
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+ Collection<ColumnName> columnNames =
dumperConfig.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getAfterRows()) {
DataRecord dataRecord =
createDataRecord(IngestDataChangeType.INSERT, event, each.length);
for (int i = 0; i < each.length; i++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
- if (isColumnUnneeded(columnNameSet, columnMetaData.getName()))
{
+ if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
@@ -177,12 +177,12 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
return result;
}
- private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet,
final String columnName) {
- return null != columnNameSet && !columnNameSet.contains(new
ColumnName(columnName));
+ private boolean isColumnUnneeded(final Collection<ColumnName> columnNames,
final String columnName) {
+ return !columnNames.isEmpty() && !columnNames.contains(new
ColumnName(columnName));
}
private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent
event, final PipelineTableMetaData tableMetaData) {
- Set<ColumnName> columnNameSet =
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+ Collection<ColumnName> columnNames =
dumperConfig.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (int i = 0; i < event.getBeforeRows().size(); i++) {
Serializable[] beforeValues = event.getBeforeRows().get(i);
@@ -193,7 +193,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
Serializable newValue = afterValues[j];
boolean updated = !Objects.equals(newValue, oldValue);
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(j + 1);
- if (isColumnUnneeded(columnNameSet, columnMetaData.getName()))
{
+ if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
@@ -206,13 +206,13 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent
event, final PipelineTableMetaData tableMetaData) {
- Set<ColumnName> columnNameSet =
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+ Collection<ColumnName> columnNames =
dumperConfig.getColumnNames(event.getTableName());
List<DataRecord> result = new LinkedList<>();
for (Serializable[] each : event.getBeforeRows()) {
DataRecord dataRecord =
createDataRecord(IngestDataChangeType.DELETE, event, each.length);
for (int i = 0, length = each.length; i < length; i++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
- if (isColumnUnneeded(columnNameSet, columnMetaData.getName()))
{
+ if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
dataRecord.addColumn(new Column(columnMetaData.getName(),
handleValue(columnMetaData, each[i]), null, true,
columnMetaData.isUniqueKey()));
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index bd261596024..5703ed68ce0 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -35,8 +35,8 @@ import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Updat
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
+import java.util.Collection;
import java.util.List;
-import java.util.Set;
/**
* WAL event converter.
@@ -128,10 +128,10 @@ public final class WALEventConverter {
}
private void putColumnsIntoDataRecord(final DataRecord dataRecord, final
PipelineTableMetaData tableMetaData, final String actualTableName, final
List<Object> values) {
- Set<ColumnName> columnNameSet =
dumperConfig.getColumnNameSet(actualTableName).orElse(null);
+ Collection<ColumnName> columnNames =
dumperConfig.getColumnNames(actualTableName);
for (int i = 0, count = values.size(); i < count; i++) {
PipelineColumnMetaData columnMetaData =
tableMetaData.getColumnMetaData(i + 1);
- if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+ if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
continue;
}
boolean isUniqueKey = columnMetaData.isUniqueKey();
@@ -141,7 +141,7 @@ public final class WALEventConverter {
}
}
- private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet,
final String columnName) {
- return null != columnNameSet && !columnNameSet.contains(new
ColumnName(columnName));
+ private boolean isColumnUnneeded(final Collection<ColumnName> columnNames,
final String columnName) {
+ return !columnNames.isEmpty() && !columnNames.contains(new
ColumnName(columnName));
}
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 1cd12fdb8b8..451637bd877 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -54,11 +54,11 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -131,7 +131,7 @@ class WALEventConverterTest {
assertWriteRowEvent0(mockTargetTableColumnsMap(), 1);
}
- private void assertWriteRowEvent0(final Map<LogicTableName,
Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws
ReflectiveOperationException {
+ private void assertWriteRowEvent0(final Map<LogicTableName,
Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount)
throws ReflectiveOperationException {
dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
WriteRowEvent rowsEvent = new WriteRowEvent();
rowsEvent.setSchemaName("");
@@ -143,7 +143,7 @@ class WALEventConverterTest {
assertThat(actual.getColumnCount(), is(expectedColumnCount));
}
- private Map<LogicTableName, Set<ColumnName>> mockTargetTableColumnsMap() {
+ private Map<LogicTableName, Collection<ColumnName>>
mockTargetTableColumnsMap() {
return Collections.singletonMap(new LogicTableName("t_order"),
Collections.singleton(new ColumnName("order_id")));
}