This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e664f0e4deb Refactor DumperConfiguration (#28921)
e664f0e4deb is described below

commit e664f0e4deb455e73341ebc478995eb34598eeb7
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 2 23:08:30 2023 +0800

    Refactor DumperConfiguration (#28921)
    
    * Refactor DumperConfiguration
    
    * Refactor DumperConfiguration
    
    * Refactor DumperConfiguration
    
    * Refactor DumperConfiguration
---
 .../api/config/ingest/DumperConfiguration.java     | 27 ++++++++++------------
 .../PipelineInventoryDumpSQLBuilder.java           | 10 ++++----
 .../data/pipeline/core/dumper/InventoryDumper.java |  3 ++-
 .../mysql/ingest/MySQLIncrementalDumper.java       | 18 +++++++--------
 .../postgresql/ingest/wal/WALEventConverter.java   | 10 ++++----
 .../ingest/wal/WALEventConverterTest.java          |  6 ++---
 6 files changed, 36 insertions(+), 38 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
index 474ea9cc8be..4d8c81f83a8 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/config/ingest/DumperConfiguration.java
@@ -28,10 +28,10 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 
-import java.util.List;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -58,7 +58,7 @@ public class DumperConfiguration {
     
     // LinkedHashSet is required
     @Getter(AccessLevel.PROTECTED)
-    private Map<LogicTableName, Set<ColumnName>> targetTableColumnsMap;
+    private Map<LogicTableName, Collection<ColumnName>> targetTableColumnsMap 
= new HashMap<>();
     
     private boolean decodeWithTX;
     
@@ -112,22 +112,19 @@ public class DumperConfiguration {
      * @param logicTableName logic table name
      * @return column names
      */
-    public Optional<List<String>> getColumnNames(final LogicTableName 
logicTableName) {
-        Set<ColumnName> columnNames = null == targetTableColumnsMap ? null : 
targetTableColumnsMap.get(logicTableName);
-        if (null == columnNames) {
-            return Optional.empty();
-        }
-        return 
Optional.of(columnNames.stream().map(ColumnName::getOriginal).collect(Collectors.toList()));
+    public Collection<String> getColumnNames(final LogicTableName 
logicTableName) {
+        return targetTableColumnsMap.containsKey(logicTableName)
+                ? 
targetTableColumnsMap.get(logicTableName).stream().map(ColumnName::getOriginal).collect(Collectors.toList())
+                : Collections.singleton("*");
     }
     
     /**
-     * Get column name set of table.
+     * Get column names.
      *
      * @param actualTableName actual table name
-     * @return column names of table
+     * @return column names
      */
-    public Optional<Set<ColumnName>> getColumnNameSet(final String 
actualTableName) {
-        Set<ColumnName> result = null == targetTableColumnsMap ? null : 
targetTableColumnsMap.get(getLogicTableName(actualTableName));
-        return Optional.ofNullable(result);
+    public Collection<ColumnName> getColumnNames(final String actualTableName) 
{
+        return 
targetTableColumnsMap.getOrDefault(getLogicTableName(actualTableName), 
Collections.emptySet());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
index 81a9711856a..395d98cacf2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/sqlbuilder/PipelineInventoryDumpSQLBuilder.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder;
 
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.stream.Collectors;
 
 /**
@@ -42,7 +42,7 @@ public final class PipelineInventoryDumpSQLBuilder {
      * @param uniqueKey unique key
      * @return built SQL
      */
-    public String buildDivisibleSQL(final String schemaName, final String 
tableName, final List<String> columnNames, final String uniqueKey) {
+    public String buildDivisibleSQL(final String schemaName, final String 
tableName, final Collection<String> columnNames, final String uniqueKey) {
         String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
         String escapedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return String.format("SELECT %s FROM %s WHERE %s>=? AND %s<=? ORDER BY 
%s ASC", buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, 
escapedUniqueKey, escapedUniqueKey);
@@ -57,7 +57,7 @@ public final class PipelineInventoryDumpSQLBuilder {
      * @param uniqueKey unique key
      * @return built SQL
      */
-    public String buildUnlimitedDivisibleSQL(final String schemaName, final 
String tableName, final List<String> columnNames, final String uniqueKey) {
+    public String buildUnlimitedDivisibleSQL(final String schemaName, final 
String tableName, final Collection<String> columnNames, final String uniqueKey) 
{
         String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
         String escapedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return String.format("SELECT %s FROM %s WHERE %s>=? ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, escapedUniqueKey, 
escapedUniqueKey);
@@ -72,13 +72,13 @@ public final class PipelineInventoryDumpSQLBuilder {
      * @param uniqueKey unique key
      * @return built SQL
      */
-    public String buildIndivisibleSQL(final String schemaName, final String 
tableName, final List<String> columnNames, final String uniqueKey) {
+    public String buildIndivisibleSQL(final String schemaName, final String 
tableName, final Collection<String> columnNames, final String uniqueKey) {
         String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
         String quotedUniqueKey = 
sqlSegmentBuilder.getEscapedIdentifier(uniqueKey);
         return String.format("SELECT %s FROM %s ORDER BY %s ASC", 
buildQueryColumns(columnNames), qualifiedTableName, quotedUniqueKey);
     }
     
-    private String buildQueryColumns(final List<String> columnNames) {
+    private String buildQueryColumns(final Collection<String> columnNames) {
         return 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index 31e80d750d5..23462cdf1db 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -57,6 +57,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -162,7 +163,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         }
         PrimaryKeyPosition<?> primaryKeyPosition = (PrimaryKeyPosition<?>) 
dumperConfig.getPosition();
         PipelineColumnMetaData firstColumn = 
dumperConfig.getUniqueKeyColumns().get(0);
-        List<String> columnNames = 
dumperConfig.getColumnNames(logicTableName).orElse(Collections.singletonList("*"));
+        Collection<String> columnNames = 
dumperConfig.getColumnNames(logicTableName);
         if (PipelineJdbcUtils.isIntegerColumn(firstColumn.getDataType()) || 
PipelineJdbcUtils.isStringColumn(firstColumn.getDataType())) {
             if (null != primaryKeyPosition.getBeginValue() && null != 
primaryKeyPosition.getEndValue()) {
                 return inventoryDumpSQLBuilder.buildDivisibleSQL(schemaName, 
dumperConfig.getActualTableName(), columnNames, firstColumn.getName());
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 6c5cde7993a..c41f2ae9e33 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -55,12 +55,12 @@ import 
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.io.Serializable;
 import java.nio.charset.Charset;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * MySQL incremental dumper.
@@ -161,13 +161,13 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     }
     
     private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, 
final PipelineTableMetaData tableMetaData) {
-        Set<ColumnName> columnNameSet = 
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+        Collection<ColumnName> columnNames = 
dumperConfig.getColumnNames(event.getTableName());
         List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getAfterRows()) {
             DataRecord dataRecord = 
createDataRecord(IngestDataChangeType.INSERT, event, each.length);
             for (int i = 0; i < each.length; i++) {
                 PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(i + 1);
-                if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) 
{
+                if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
                     continue;
                 }
                 dataRecord.addColumn(new Column(columnMetaData.getName(), 
handleValue(columnMetaData, each[i]), true, columnMetaData.isUniqueKey()));
@@ -177,12 +177,12 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
         return result;
     }
     
-    private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, 
final String columnName) {
-        return null != columnNameSet && !columnNameSet.contains(new 
ColumnName(columnName));
+    private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, 
final String columnName) {
+        return !columnNames.isEmpty() && !columnNames.contains(new 
ColumnName(columnName));
     }
     
     private List<DataRecord> handleUpdateRowsEvent(final UpdateRowsEvent 
event, final PipelineTableMetaData tableMetaData) {
-        Set<ColumnName> columnNameSet = 
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+        Collection<ColumnName> columnNames = 
dumperConfig.getColumnNames(event.getTableName());
         List<DataRecord> result = new LinkedList<>();
         for (int i = 0; i < event.getBeforeRows().size(); i++) {
             Serializable[] beforeValues = event.getBeforeRows().get(i);
@@ -193,7 +193,7 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
                 Serializable newValue = afterValues[j];
                 boolean updated = !Objects.equals(newValue, oldValue);
                 PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(j + 1);
-                if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) 
{
+                if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
                     continue;
                 }
                 dataRecord.addColumn(new Column(columnMetaData.getName(),
@@ -206,13 +206,13 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     }
     
     private List<DataRecord> handleDeleteRowsEvent(final DeleteRowsEvent 
event, final PipelineTableMetaData tableMetaData) {
-        Set<ColumnName> columnNameSet = 
dumperConfig.getColumnNameSet(event.getTableName()).orElse(null);
+        Collection<ColumnName> columnNames = 
dumperConfig.getColumnNames(event.getTableName());
         List<DataRecord> result = new LinkedList<>();
         for (Serializable[] each : event.getBeforeRows()) {
             DataRecord dataRecord = 
createDataRecord(IngestDataChangeType.DELETE, event, each.length);
             for (int i = 0, length = each.length; i < length; i++) {
                 PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(i + 1);
-                if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) 
{
+                if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
                     continue;
                 }
                 dataRecord.addColumn(new Column(columnMetaData.getName(), 
handleValue(columnMetaData, each[i]), null, true, 
columnMetaData.isUniqueKey()));
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index bd261596024..5703ed68ce0 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -35,8 +35,8 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.Updat
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.WriteRowEvent;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.generic.UnsupportedSQLOperationException;
 
+import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 
 /**
  * WAL event converter.
@@ -128,10 +128,10 @@ public final class WALEventConverter {
     }
     
     private void putColumnsIntoDataRecord(final DataRecord dataRecord, final 
PipelineTableMetaData tableMetaData, final String actualTableName, final 
List<Object> values) {
-        Set<ColumnName> columnNameSet = 
dumperConfig.getColumnNameSet(actualTableName).orElse(null);
+        Collection<ColumnName> columnNames = 
dumperConfig.getColumnNames(actualTableName);
         for (int i = 0, count = values.size(); i < count; i++) {
             PipelineColumnMetaData columnMetaData = 
tableMetaData.getColumnMetaData(i + 1);
-            if (isColumnUnneeded(columnNameSet, columnMetaData.getName())) {
+            if (isColumnUnneeded(columnNames, columnMetaData.getName())) {
                 continue;
             }
             boolean isUniqueKey = columnMetaData.isUniqueKey();
@@ -141,7 +141,7 @@ public final class WALEventConverter {
         }
     }
     
-    private boolean isColumnUnneeded(final Set<ColumnName> columnNameSet, 
final String columnName) {
-        return null != columnNameSet && !columnNameSet.contains(new 
ColumnName(columnName));
+    private boolean isColumnUnneeded(final Collection<ColumnName> columnNames, 
final String columnName) {
+        return !columnNames.isEmpty() && !columnNames.contains(new 
ColumnName(columnName));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 1cd12fdb8b8..451637bd877 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -54,11 +54,11 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -131,7 +131,7 @@ class WALEventConverterTest {
         assertWriteRowEvent0(mockTargetTableColumnsMap(), 1);
     }
     
-    private void assertWriteRowEvent0(final Map<LogicTableName, 
Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws 
ReflectiveOperationException {
+    private void assertWriteRowEvent0(final Map<LogicTableName, 
Collection<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) 
throws ReflectiveOperationException {
         dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
         WriteRowEvent rowsEvent = new WriteRowEvent();
         rowsEvent.setSchemaName("");
@@ -143,7 +143,7 @@ class WALEventConverterTest {
         assertThat(actual.getColumnCount(), is(expectedColumnCount));
     }
     
-    private Map<LogicTableName, Set<ColumnName>> mockTargetTableColumnsMap() {
+    private Map<LogicTableName, Collection<ColumnName>> 
mockTargetTableColumnsMap() {
         return Collections.singletonMap(new LogicTableName("t_order"), 
Collections.singleton(new ColumnName("order_id")));
     }
     

Reply via email to