This is an automated email from the ASF dual-hosted git repository.
tuichenchuxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new fc9c81c58c9 Make CaseInsensitiveIdentifier as final class (#28982)
fc9c81c58c9 is described below
commit fc9c81c58c9bb8d53cecfb46624fdf2ecbf07a67
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 7 23:20:06 2023 +0800
Make CaseInsensitiveIdentifier as final class (#28982)
* Remove useless SchemaName
* Remove TableName
* Refactor SchemaTableName
* Remove LogicTableName
---
.../common/config/ImporterConfiguration.java | 10 +++----
.../datanode/JobDataNodeLineConvertUtils.java | 9 +++---
.../pipeline/common/metadata/ActualTableName.java | 28 -------------------
.../common/metadata/CaseInsensitiveIdentifier.java | 2 +-
.../pipeline/common/metadata/LogicTableName.java | 28 -------------------
.../data/pipeline/common/metadata/SchemaName.java | 32 ----------------------
.../pipeline/common/metadata/SchemaTableName.java | 13 ++++-----
.../data/pipeline/common/metadata/TableName.java | 31 ---------------------
.../StandardPipelineTableMetaDataLoader.java | 18 ++++++------
.../common/util/ShardingColumnsExtractor.java | 10 +++----
.../mapper/ActualAndLogicTableNameMapper.java | 11 ++++----
.../context/mapper/TableAndSchemaNameMapper.java | 14 +++++-----
...est.java => CaseInsensitiveIdentifierTest.java} | 13 +++------
.../mysql/ingest/MySQLIncrementalDumper.java | 4 +--
.../mysql/ingest/MySQLIncrementalDumperTest.java | 23 ++++++++--------
.../postgresql/ingest/wal/WALEventConverter.java | 12 ++++----
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 13 ++++-----
.../ingest/wal/WALEventConverterTest.java | 19 ++++++-------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 26 +++++++++---------
.../migration/api/impl/MigrationJobAPI.java | 18 ++++++------
.../core/importer/PipelineDataSourceSinkTest.java | 16 +++++------
21 files changed, 109 insertions(+), 241 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
index 7b63bbca28a..31720e9e204 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
@@ -20,10 +20,10 @@ package
org.apache.shardingsphere.data.pipeline.common.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -45,7 +45,7 @@ public final class ImporterConfiguration {
private final PipelineDataSourceConfiguration dataSourceConfig;
// TODO columnName case-insensitive?
- private final Map<LogicTableName, Set<String>> shardingColumnsMap;
+ private final Map<CaseInsensitiveIdentifier, Set<String>>
shardingColumnsMap;
private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
@@ -63,7 +63,7 @@ public final class ImporterConfiguration {
* @return logic table names
*/
public Collection<String> getLogicTableNames() {
- return
Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(LogicTableName::toString).collect(Collectors.toList()));
+ return
Collections.unmodifiableList(shardingColumnsMap.keySet().stream().map(CaseInsensitiveIdentifier::toString).collect(Collectors.toList()));
}
/**
@@ -73,7 +73,7 @@ public final class ImporterConfiguration {
* @return sharding columns
*/
public Set<String> getShardingColumns(final String logicTableName) {
- return shardingColumnsMap.getOrDefault(new
LogicTableName(logicTableName), Collections.emptySet());
+ return shardingColumnsMap.getOrDefault(new
CaseInsensitiveIdentifier(logicTableName), Collections.emptySet());
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
index 997f38aa8e9..26e3ef3df33 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
@@ -20,8 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.common.datanode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.infra.datanode.DataNode;
import java.util.LinkedHashMap;
@@ -74,11 +73,11 @@ public final class JobDataNodeLineConvertUtils {
* @param dataNodeLine data node line
* @return actual table and logic table map
*/
- public static Map<ActualTableName, LogicTableName> buildTableNameMap(final
JobDataNodeLine dataNodeLine) {
- Map<ActualTableName, LogicTableName> result = new LinkedHashMap<>();
+ public static Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier>
buildTableNameMap(final JobDataNodeLine dataNodeLine) {
+ Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> result = new
LinkedHashMap<>();
for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
for (DataNode dataNode : each.getDataNodes()) {
- result.put(new ActualTableName(dataNode.getTableName()), new
LogicTableName(each.getLogicTableName()));
+ result.put(new
CaseInsensitiveIdentifier(dataNode.getTableName()), new
CaseInsensitiveIdentifier(each.getLogicTableName()));
}
}
return result;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java
deleted file mode 100644
index 10463ed166a..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/ActualTableName.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.metadata;
-
-/**
- * Actual table name.
- */
-public final class ActualTableName extends TableName {
-
- public ActualTableName(final String tableName) {
- super(tableName);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java
index 3955c6864b0..feb48a58483 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifier.java
@@ -24,7 +24,7 @@ import lombok.EqualsAndHashCode;
*/
// TODO table name case-sensitive for some database
@EqualsAndHashCode(of = "lowercase")
-public class CaseInsensitiveIdentifier {
+public final class CaseInsensitiveIdentifier {
private final String original;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java
deleted file mode 100644
index 6c4045f5048..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/LogicTableName.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.metadata;
-
-/**
- * Logic table name.
- */
-public final class LogicTableName extends TableName {
-
- public LogicTableName(final String tableName) {
- super(tableName);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java
deleted file mode 100644
index 3ae4a740fdc..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaName.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.metadata;
-
-import javax.annotation.Nullable;
-
-/**
- * Schema name.
- * <p>It might be null.</p>
- * <p>It's case-insensitive.</p>
- */
-public class SchemaName extends CaseInsensitiveIdentifier {
-
- public SchemaName(@Nullable final String schemaName) {
- super(schemaName);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java
index 56c3e70abed..316cac8d5f4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/SchemaTableName.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.common.metadata;
import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
@@ -30,17 +29,15 @@ import lombok.ToString;
@Getter
@EqualsAndHashCode
@ToString
-public class SchemaTableName {
+public final class SchemaTableName {
- @NonNull
- private final SchemaName schemaName;
+ private final CaseInsensitiveIdentifier schemaName;
- @NonNull
- private final TableName tableName;
+ private final CaseInsensitiveIdentifier tableName;
public SchemaTableName(final String schemaName, final String tableName) {
- this.schemaName = new SchemaName(schemaName);
- this.tableName = new TableName(tableName);
+ this.schemaName = new CaseInsensitiveIdentifier(schemaName);
+ this.tableName = new CaseInsensitiveIdentifier(tableName);
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java
deleted file mode 100644
index 26f2c17be14..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableName.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.common.metadata;
-
-/**
- * Table name.
- *
- * <p>It might be logic table name or actual table name.</p>
- * <p>It's case-insensitive.</p>
- */
-public class TableName extends CaseInsensitiveIdentifier {
-
- public TableName(final String tableName) {
- super(tableName);
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 31708ad4bb7..e7a12eca308 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -19,11 +19,11 @@ package
org.apache.shardingsphere.data.pipeline.common.metadata.loader;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.metadata.TableName;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineIndexMetaData;
import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -53,11 +53,11 @@ public final class StandardPipelineTableMetaDataLoader
implements PipelineTableM
// It doesn't support ShardingSphereDataSource
private final PipelineDataSourceWrapper dataSource;
- private final Map<TableName, PipelineTableMetaData> tableMetaDataMap = new
ConcurrentHashMap<>();
+ private final Map<CaseInsensitiveIdentifier, PipelineTableMetaData>
tableMetaDataMap = new ConcurrentHashMap<>();
@Override
public PipelineTableMetaData getTableMetaData(final String schemaName,
final String tableName) {
- PipelineTableMetaData result = tableMetaDataMap.get(new
TableName(tableName));
+ PipelineTableMetaData result = tableMetaDataMap.get(new
CaseInsensitiveIdentifier(tableName));
if (null != result) {
return result;
}
@@ -66,7 +66,7 @@ public final class StandardPipelineTableMetaDataLoader
implements PipelineTableM
} catch (final SQLException ex) {
throw new PipelineInternalException(String.format("Load meta data
for schema '%s' and table '%s' failed", schemaName, tableName), ex);
}
- result = tableMetaDataMap.get(new TableName(tableName));
+ result = tableMetaDataMap.get(new
CaseInsensitiveIdentifier(tableName));
if (null == result) {
log.warn("getTableMetaData, can not load meta data for table
'{}'", tableName);
}
@@ -76,12 +76,12 @@ public final class StandardPipelineTableMetaDataLoader
implements PipelineTableM
private void loadTableMetaData(final String schemaName, final String
tableNamePattern) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData();
- Map<TableName, PipelineTableMetaData> tableMetaDataMap =
loadTableMetaData0(connection, dialectDatabaseMetaData.isSchemaAvailable() ?
schemaName : null, tableNamePattern);
+ Map<CaseInsensitiveIdentifier, PipelineTableMetaData>
tableMetaDataMap = loadTableMetaData0(connection,
dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null,
tableNamePattern);
this.tableMetaDataMap.putAll(tableMetaDataMap);
}
}
- private Map<TableName, PipelineTableMetaData> loadTableMetaData0(final
Connection connection, final String schemaName, final String tableNamePattern)
throws SQLException {
+ private Map<CaseInsensitiveIdentifier, PipelineTableMetaData>
loadTableMetaData0(final Connection connection, final String schemaName, final
String tableNamePattern) throws SQLException {
Collection<String> tableNames = new LinkedList<>();
try (ResultSet resultSet =
connection.getMetaData().getTables(connection.getCatalog(), schemaName,
tableNamePattern, null)) {
while (resultSet.next()) {
@@ -89,7 +89,7 @@ public final class StandardPipelineTableMetaDataLoader
implements PipelineTableM
tableNames.add(tableName);
}
}
- Map<TableName, PipelineTableMetaData> result = new LinkedHashMap<>();
+ Map<CaseInsensitiveIdentifier, PipelineTableMetaData> result = new
LinkedHashMap<>();
for (String each : tableNames) {
Set<String> primaryKeys = loadPrimaryKeys(connection, schemaName,
each);
Map<String, Collection<String>> uniqueKeys =
loadUniqueIndexesOfTable(connection, schemaName, each);
@@ -112,7 +112,7 @@ public final class StandardPipelineTableMetaDataLoader
implements PipelineTableM
}
Collection<PipelineIndexMetaData> uniqueIndexMetaData =
uniqueKeys.entrySet().stream()
.map(entry -> new PipelineIndexMetaData(entry.getKey(),
entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()))).collect(Collectors.toList());
- result.put(new TableName(each), new PipelineTableMetaData(each,
columnMetaDataMap, uniqueIndexMetaData));
+ result.put(new CaseInsensitiveIdentifier(each), new
PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
}
return result;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java
index 9df6c93f248..8299f12e7ef 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/util/ShardingColumnsExtractor.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.common.util;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
@@ -48,16 +48,16 @@ public final class ShardingColumnsExtractor {
* @param logicTableNames logic table names
* @return sharding columns map
*/
- public Map<LogicTableName, Set<String>> getShardingColumnsMap(final
Collection<YamlRuleConfiguration> yamlRuleConfigs, final Set<LogicTableName>
logicTableNames) {
+ public Map<CaseInsensitiveIdentifier, Set<String>>
getShardingColumnsMap(final Collection<YamlRuleConfiguration> yamlRuleConfigs,
final Set<CaseInsensitiveIdentifier> logicTableNames) {
Optional<ShardingRuleConfiguration> shardingRuleConfig =
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs);
if (!shardingRuleConfig.isPresent()) {
return Collections.emptyMap();
}
Set<String> defaultDatabaseShardingColumns =
extractShardingColumns(shardingRuleConfig.get().getDefaultDatabaseShardingStrategy());
Set<String> defaultTableShardingColumns =
extractShardingColumns(shardingRuleConfig.get().getDefaultTableShardingStrategy());
- Map<LogicTableName, Set<String>> result = new ConcurrentHashMap<>();
+ Map<CaseInsensitiveIdentifier, Set<String>> result = new
ConcurrentHashMap<>();
for (ShardingTableRuleConfiguration each :
shardingRuleConfig.get().getTables()) {
- LogicTableName logicTableName = new
LogicTableName(each.getLogicTable());
+ CaseInsensitiveIdentifier logicTableName = new
CaseInsensitiveIdentifier(each.getLogicTable());
if (!logicTableNames.contains(logicTableName)) {
continue;
}
@@ -67,7 +67,7 @@ public final class ShardingColumnsExtractor {
result.put(logicTableName, shardingColumns);
}
for (ShardingAutoTableRuleConfiguration each :
shardingRuleConfig.get().getAutoTables()) {
- LogicTableName logicTableName = new
LogicTableName(each.getLogicTable());
+ CaseInsensitiveIdentifier logicTableName = new
CaseInsensitiveIdentifier(each.getLogicTable());
if (!logicTableNames.contains(logicTableName)) {
continue;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
index 69c4ba3fa7a..135a26137df 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
@@ -20,8 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mappe
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import java.util.Map;
@@ -33,7 +32,7 @@ import java.util.Map;
@ToString
public final class ActualAndLogicTableNameMapper {
- private final Map<ActualTableName, LogicTableName> tableNameMap;
+ private final Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier>
tableNameMap;
/**
* Get logic table name.
@@ -41,8 +40,8 @@ public final class ActualAndLogicTableNameMapper {
* @param actualTableName actual table name
* @return logic table name
*/
- public LogicTableName getLogicTableName(final String actualTableName) {
- return tableNameMap.get(new ActualTableName(actualTableName));
+ public CaseInsensitiveIdentifier getLogicTableName(final String
actualTableName) {
+ return tableNameMap.get(new
CaseInsensitiveIdentifier(actualTableName));
}
/**
@@ -52,6 +51,6 @@ public final class ActualAndLogicTableNameMapper {
* @return contains or not
*/
public boolean containsTable(final String actualTableName) {
- return tableNameMap.containsKey(new ActualTableName(actualTableName));
+ return tableNameMap.containsKey(new
CaseInsensitiveIdentifier(actualTableName));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
index bf7c9edd6dd..9c250533a54 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import java.util.Collection;
import java.util.Collections;
@@ -33,7 +33,7 @@ import java.util.stream.Collectors;
@ToString
public final class TableAndSchemaNameMapper {
- private final Map<LogicTableName, String> mapping;
+ private final Map<CaseInsensitiveIdentifier, String> mapping;
public TableAndSchemaNameMapper(final Map<String, String> tableSchemaMap) {
mapping = null == tableSchemaMap ? Collections.emptyMap() :
getLogicTableNameMap(tableSchemaMap);
@@ -44,13 +44,13 @@ public final class TableAndSchemaNameMapper {
mapping = getLogicTableNameMap(tableNameSchemaMap);
}
- private Map<LogicTableName, String> getLogicTableNameMap(final Map<String,
String> tableSchemaMap) {
- Map<LogicTableName, String> result = new
HashMap<>(tableSchemaMap.size(), 1F);
+ private Map<CaseInsensitiveIdentifier, String> getLogicTableNameMap(final
Map<String, String> tableSchemaMap) {
+ Map<CaseInsensitiveIdentifier, String> result = new
HashMap<>(tableSchemaMap.size(), 1F);
for (Entry<String, String> entry : tableSchemaMap.entrySet()) {
String tableName = entry.getKey();
String schemaName = entry.getValue();
if (null != schemaName) {
- result.put(new LogicTableName(tableName), schemaName);
+ result.put(new CaseInsensitiveIdentifier(tableName),
schemaName);
}
}
return result;
@@ -63,7 +63,7 @@ public final class TableAndSchemaNameMapper {
* @return schema name
*/
public String getSchemaName(final String logicTableName) {
- return mapping.get(new LogicTableName(logicTableName));
+ return mapping.get(new CaseInsensitiveIdentifier(logicTableName));
}
/**
@@ -72,7 +72,7 @@ public final class TableAndSchemaNameMapper {
* @param logicTableName logic table name
* @return schema name
*/
- public String getSchemaName(final LogicTableName logicTableName) {
+ public String getSchemaName(final CaseInsensitiveIdentifier
logicTableName) {
return mapping.get(logicTableName);
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableNameTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifierTest.java
similarity index 77%
rename from
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableNameTest.java
rename to
kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifierTest.java
index 36a7899cac1..f8ac93a28e5 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/TableNameTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/CaseInsensitiveIdentifierTest.java
@@ -19,24 +19,19 @@ package
org.apache.shardingsphere.data.pipeline.common.metadata;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-class TableNameTest {
+class CaseInsensitiveIdentifierTest {
@Test
- void assertGetLowercase() {
- for (String tableName : Arrays.asList("t_order", "T_ORDER")) {
- TableName actual = new TableName(tableName);
- assertThat(actual.getLowercase(), is("t_order"));
- }
+ void assertEquals() {
+ assertThat(new CaseInsensitiveIdentifier("t_order"), is(new
CaseInsensitiveIdentifier("T_ORDER")));
}
@Test
void assertToString() {
- TableName actual = new TableName("T_ORDER");
+ CaseInsensitiveIdentifier actual = new
CaseInsensitiveIdentifier("T_ORDER");
assertThat(actual.toString(), is("T_ORDER"));
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 98fedbed774..fd6a290997c 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
@@ -156,7 +156,7 @@ public final class MySQLIncrementalDumper extends
AbstractPipelineLifecycleRunna
}
private PipelineTableMetaData getPipelineTableMetaData(final String
actualTableName) {
- LogicTableName logicTableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+ CaseInsensitiveIdentifier logicTableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
return
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 757788150d5..c95d9c79d30 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -18,23 +18,22 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.EmptyAckCallback;
import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.DeleteRowsEvent;
@@ -99,7 +98,7 @@ class MySQLIncrementalDumperTest {
private IncrementalDumperContext createDumperContext() {
DumperCommonContext commonContext = new DumperCommonContext(null,
new
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test",
"root", "root"),
- new ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))),
+ new ActualAndLogicTableNameMapper(Collections.singletonMap(new
CaseInsensitiveIdentifier("t_order"), new
CaseInsensitiveIdentifier("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, null, false);
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index 56fcf76793a..bff4087b609 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -17,16 +17,16 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWALEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
@@ -90,7 +90,7 @@ public final class WALEventConverter {
}
private PipelineTableMetaData getPipelineTableMetaData(final String
actualTableName) {
- LogicTableName logicTableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+ CaseInsensitiveIdentifier logicTableName =
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
return
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 10ab93b7e79..57374b61878 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -18,18 +18,17 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.EmptyAckCallback;
import
org.apache.shardingsphere.data.pipeline.common.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
@@ -109,7 +108,7 @@ class PostgreSQLWALDumperTest {
private IncrementalDumperContext createDumperContext(final String jdbcUrl,
final String username, final String password) {
DumperCommonContext commonContext = new DumperCommonContext(null,
new StandardPipelineDataSourceConfiguration(jdbcUrl, username,
password),
- new ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order"))),
+ new ActualAndLogicTableNameMapper(Collections.singletonMap(new
CaseInsensitiveIdentifier("t_order_0"), new
CaseInsensitiveIdentifier("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, "0101123456",
false);
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 2d835035350..70f4fbbf6e6 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -18,6 +18,14 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
@@ -25,15 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.BeginTXEvent;
@@ -88,7 +87,7 @@ class WALEventConverterTest {
private IncrementalDumperContext mockDumperContext() {
DumperCommonContext commonContext = new DumperCommonContext(null,
new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"),
- new ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))),
+ new ActualAndLogicTableNameMapper(Collections.singletonMap(new
CaseInsensitiveIdentifier("t_order"), new
CaseInsensitiveIdentifier("t_order"))),
new TableAndSchemaNameMapper(Collections.emptyMap()));
return new IncrementalDumperContext(commonContext, null, false);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 96ea1edd716..c81675d26da 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -22,16 +22,8 @@ import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.common.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
@@ -54,15 +46,19 @@ import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
@@ -70,12 +66,15 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
-import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -281,8 +280,9 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
- dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
+ Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> tableNameMap
= new LinkedHashMap<>();
+ dataNodeLine.getEntries()
+ .forEach(each -> each.getDataNodes().forEach(node ->
tableNameMap.put(new CaseInsensitiveIdentifier(node.getTableName()), new
CaseInsensitiveIdentifier(each.getLogicTableName()))));
return new IncrementalDumperContext(
new DumperCommonContext(dataSourceName,
actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap),
tableAndSchemaNameMapper),
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
@@ -295,8 +295,8 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
CDCProcessContext processContext = new
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- Map<LogicTableName, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
-
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
+ Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
+
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 987620ee630..f58330485e6 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -19,14 +19,9 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
-import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration.CreateTableEntry;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
@@ -42,12 +37,16 @@ import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import org.apache.shardingsphere.data.pipeline.common.metadata.SchemaTableName;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
@@ -56,6 +55,8 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.connection.Registe
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
@@ -69,7 +70,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
-import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -265,8 +265,8 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- Set<LogicTableName> targetTableNames =
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
- Map<LogicTableName, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
+ Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
+ Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
ImporterConfiguration importerConfig = buildImporterConfiguration(
jobConfig, pipelineProcessConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
@@ -296,7 +296,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 88a49fba7a9..7908c1a1f28 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -17,24 +17,24 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.importer;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
-import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.common.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.importer.SingleChannelConsumerImporter;
import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineDataSourceSink;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
+import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.FixtureInventoryIncrementalJobItemContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -192,7 +192,7 @@ class PipelineDataSourceSinkTest {
}
private ImporterConfiguration mockImporterConfiguration() {
- Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("test_table"),
Collections.singleton("user"));
+ Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap =
Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"),
Collections.singleton("user"));
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
}
}