This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new cd287d6530f Use ShardingSphereIdentifier on ShardingColumnsExtractor 
(#33949)
cd287d6530f is described below

commit cd287d6530f6e80251a6f6947351dc8e8f23bf89
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 7 13:06:00 2024 +0800

    Use ShardingSphereIdentifier on ShardingColumnsExtractor (#33949)
    
    * Fix E2EEnvironmentEngine
    
    * Use ShardingSphereIdentifier on ShardingColumnsExtractor
    
    * Use ShardingSphereIdentifier on ShardingColumnsExtractor
    
    * Use ShardingSphereIdentifier on ShardingColumnsExtractor
---
 .../ShardingRuleConfigurationConverter.java        |  2 --
 .../core/importer/ImporterConfiguration.java       | 12 ++++----
 .../core/util/ShardingColumnsExtractor.java        | 32 ++++++++++------------
 .../core/importer/ImporterConfigurationTest.java   |  6 ++--
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  6 ++--
 .../migration/MigrationJobExecutorCallback.java    |  8 +++---
 .../test/e2e/env/E2EEnvironmentEngine.java         |  3 +-
 .../sink/type/PipelineDataSourceSinkTest.java      |  4 +--
 .../pipeline/core/util/PipelineContextUtils.java   |  8 +++---
 9 files changed, 40 insertions(+), 41 deletions(-)

diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
index f869417b033..66e12f94d66 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
@@ -38,7 +38,6 @@ public final class ShardingRuleConfigurationConverter {
      *
      * @param yamlRuleConfigs YAML rule configurations
      * @return sharding rule configuration
-     * @throws IllegalStateException if there is no available sharding rule
      */
     public static Optional<ShardingRuleConfiguration> 
findAndConvertShardingRuleConfiguration(final Collection<YamlRuleConfiguration> 
yamlRuleConfigs) {
         return findYamlShardingRuleConfiguration(yamlRuleConfigs).map(each -> 
new YamlShardingRuleConfigurationSwapper().swapToObject(each));
@@ -49,7 +48,6 @@ public final class ShardingRuleConfigurationConverter {
      *
      * @param yamlRuleConfigs YAML rule configurations
      * @return YAML sharding rule configuration
-     * @throws IllegalStateException if there is no available sharding rule
      */
     public static Optional<YamlShardingRuleConfiguration> 
findYamlShardingRuleConfiguration(final Collection<YamlRuleConfiguration> 
yamlRuleConfigs) {
         return 
yamlRuleConfigs.stream().filter(YamlShardingRuleConfiguration.class::isInstance).findFirst().map(YamlShardingRuleConfiguration.class::cast);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index 58cfd0e3909..bb0ebca58cd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -17,16 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.importer;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -45,7 +46,8 @@ public final class ImporterConfiguration {
     
     private final PipelineDataSourceConfiguration dataSourceConfig;
     
-    private final Map<CaseInsensitiveIdentifier, Set<String>> 
shardingColumnsMap;
+    @Getter(AccessLevel.NONE)
+    private final Map<ShardingSphereIdentifier, Set<String>> 
shardingColumnsMap;
     
     private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
     
@@ -64,7 +66,7 @@ public final class ImporterConfiguration {
      * @return sharding columns
      */
     public Set<String> getShardingColumns(final String logicTableName) {
-        return shardingColumnsMap.getOrDefault(new 
CaseInsensitiveIdentifier(logicTableName), Collections.emptySet());
+        return shardingColumnsMap.getOrDefault(new 
ShardingSphereIdentifier(logicTableName), Collections.emptySet());
     }
     
     /**
@@ -85,6 +87,6 @@ public final class ImporterConfiguration {
      */
     public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
         return shardingColumnsMap.keySet().stream()
-                .map(CaseInsensitiveIdentifier::toString).map(each -> new 
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), 
each)).collect(Collectors.toList());
+                .map(ShardingSphereIdentifier::getValue).map(each -> new 
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), 
each)).collect(Collectors.toList());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
index 276b8581c70..951227516d0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.util;
 
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
@@ -48,39 +48,37 @@ public final class ShardingColumnsExtractor {
      * @param logicTableNames logic table names
      * @return sharding columns map
      */
-    public Map<CaseInsensitiveIdentifier, Set<String>> 
getShardingColumnsMap(final Collection<YamlRuleConfiguration> yamlRuleConfigs, 
final Set<CaseInsensitiveIdentifier> logicTableNames) {
+    public Map<ShardingSphereIdentifier, Set<String>> 
getShardingColumnsMap(final Collection<YamlRuleConfiguration> yamlRuleConfigs, 
final Collection<ShardingSphereIdentifier> logicTableNames) {
         Optional<ShardingRuleConfiguration> shardingRuleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs);
         if (!shardingRuleConfig.isPresent()) {
             return Collections.emptyMap();
         }
         Set<String> defaultDatabaseShardingColumns = 
extractShardingColumns(shardingRuleConfig.get().getDefaultDatabaseShardingStrategy());
         Set<String> defaultTableShardingColumns = 
extractShardingColumns(shardingRuleConfig.get().getDefaultTableShardingStrategy());
-        Map<CaseInsensitiveIdentifier, Set<String>> result = new 
ConcurrentHashMap<>(shardingRuleConfig.get().getTables().size(), 1F);
+        // TODO check is it need to be ConcurrentHashMap?
+        // TODO check is it need to be ShardingSphereIdentifier with column 
names?
+        Map<ShardingSphereIdentifier, Set<String>> result = new 
ConcurrentHashMap<>(shardingRuleConfig.get().getTables().size(), 1F);
         for (ShardingTableRuleConfiguration each : 
shardingRuleConfig.get().getTables()) {
-            CaseInsensitiveIdentifier logicTableName = new 
CaseInsensitiveIdentifier(each.getLogicTable());
-            if (!logicTableNames.contains(logicTableName)) {
-                continue;
+            ShardingSphereIdentifier logicTableName = new 
ShardingSphereIdentifier(each.getLogicTable());
+            if (logicTableNames.contains(logicTableName)) {
+                Set<String> shardingColumns = new HashSet<>();
+                shardingColumns.addAll(null == 
each.getDatabaseShardingStrategy() ? defaultDatabaseShardingColumns : 
extractShardingColumns(each.getDatabaseShardingStrategy()));
+                shardingColumns.addAll(null == each.getTableShardingStrategy() 
? defaultTableShardingColumns : 
extractShardingColumns(each.getTableShardingStrategy()));
+                result.put(logicTableName, shardingColumns);
             }
-            Set<String> shardingColumns = new HashSet<>();
-            shardingColumns.addAll(null == each.getDatabaseShardingStrategy() 
? defaultDatabaseShardingColumns : 
extractShardingColumns(each.getDatabaseShardingStrategy()));
-            shardingColumns.addAll(null == each.getTableShardingStrategy() ? 
defaultTableShardingColumns : 
extractShardingColumns(each.getTableShardingStrategy()));
-            result.put(logicTableName, shardingColumns);
         }
         for (ShardingAutoTableRuleConfiguration each : 
shardingRuleConfig.get().getAutoTables()) {
-            CaseInsensitiveIdentifier logicTableName = new 
CaseInsensitiveIdentifier(each.getLogicTable());
-            if (!logicTableNames.contains(logicTableName)) {
-                continue;
+            ShardingSphereIdentifier logicTableName = new 
ShardingSphereIdentifier(each.getLogicTable());
+            if (logicTableNames.contains(logicTableName)) {
+                result.put(logicTableName, 
extractShardingColumns(each.getShardingStrategy()));
             }
-            ShardingStrategyConfiguration shardingStrategy = 
each.getShardingStrategy();
-            Set<String> shardingColumns = new 
HashSet<>(extractShardingColumns(shardingStrategy));
-            result.put(logicTableName, shardingColumns);
         }
         return result;
     }
     
     private Set<String> extractShardingColumns(final 
ShardingStrategyConfiguration shardingStrategy) {
         if (shardingStrategy instanceof StandardShardingStrategyConfiguration) 
{
-            return new 
HashSet<>(Collections.singleton(((StandardShardingStrategyConfiguration) 
shardingStrategy).getShardingColumn()));
+            return 
Collections.singleton(((StandardShardingStrategyConfiguration) 
shardingStrategy).getShardingColumn());
         }
         if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
             return new 
HashSet<>(Arrays.asList(((ComplexShardingStrategyConfiguration) 
shardingStrategy).getShardingColumns().split(",")));
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
index e7725dcd564..d0bbb7024aa 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
@@ -21,8 +21,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 
@@ -39,7 +39,7 @@ class ImporterConfigurationTest {
     @Test
     void assertGetShardingColumns() {
         ImporterConfiguration importerConfig = new ImporterConfiguration(
-                mock(PipelineDataSourceConfiguration.class), 
Collections.singletonMap(new CaseInsensitiveIdentifier("foo_tbl"), 
Collections.singleton("foo_col")),
+                mock(PipelineDataSourceConfiguration.class), 
Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"), 
Collections.singleton("foo_col")),
                 mock(TableAndSchemaNameMapper.class), 1, 
mock(JobRateLimitAlgorithm.class), 1, 1);
         assertThat(importerConfig.getShardingColumns("foo_tbl"), 
is(Collections.singleton("foo_col")));
     }
@@ -57,7 +57,7 @@ class ImporterConfigurationTest {
         TableAndSchemaNameMapper tableAndSchemaNameMapper = 
mock(TableAndSchemaNameMapper.class);
         
when(tableAndSchemaNameMapper.getSchemaName("foo_tbl")).thenReturn("foo_schema");
         ImporterConfiguration importerConfig = new ImporterConfiguration(
-                mock(PipelineDataSourceConfiguration.class), 
Collections.singletonMap(new CaseInsensitiveIdentifier("foo_tbl"), 
Collections.singleton("foo_col")),
+                mock(PipelineDataSourceConfiguration.class), 
Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"), 
Collections.singleton("foo_col")),
                 tableAndSchemaNameMapper, 1, 
mock(JobRateLimitAlgorithm.class), 1, 1);
         assertThat(importerConfig.getQualifiedTables(), 
is(Collections.singletonList(new CaseInsensitiveQualifiedTable("foo_schema", 
"foo_tbl"))));
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 7a111945446..03cab1ef778 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -64,7 +64,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.Pi
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.util.Collection;
@@ -149,8 +149,8 @@ public final class CDCJob implements PipelineJob {
                                                              final 
Collection<String> schemaTableNames, final TableAndSchemaNameMapper mapper) {
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(
                 jobConfig.getDataSourceConfig().getType(), 
jobConfig.getDataSourceConfig().getParameter());
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
-                
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
+        Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
+                
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet()));
         PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
         JobRateLimitAlgorithm writeRateLimitAlgorithm = null == 
write.getRateLimiter() ? null
                 : TypedSPILoader.getService(JobRateLimitAlgorithm.class, 
write.getRateLimiter().getType(), write.getRateLimiter().getProps());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index 2fde01e7632..cb1f86c252e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -40,8 +40,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -65,8 +65,8 @@ public final class MigrationJobExecutorCallback implements 
DistributedPipelineJo
     private MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobDataNodeLine(jobShardingItem));
         Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
+        Set<ShardingSphereIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet());
+        Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
         return new 
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
 createTableConfigs, incrementalDumperContext, importerConfig);
@@ -86,7 +86,7 @@ public final class MigrationJobExecutorCallback implements 
DistributedPipelineJo
     }
     
     private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
-                                                             final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper mapper) {
+                                                             final 
Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper mapper) {
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
         int retryTimes = jobConfig.getRetryTimes();
diff --git 
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
 
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
index c15f87c0846..b1eb9e3b03c 100644
--- 
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
+++ 
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.test.e2e.env;
 
 import lombok.Getter;
 import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.test.e2e.container.compose.ContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.container.compose.ContainerComposerRegistry;
@@ -68,7 +69,7 @@ public final class E2EEnvironmentEngine {
     
     @SneakyThrows({SQLException.class, IOException.class})
     private void executeLogicDatabaseInitSQLFileOnlyOnce(final String key, 
final String scenario, final DatabaseType databaseType, final DataSource 
targetDataSource) {
-        Optional<String> logicDatabaseInitSQLFile = new 
ScenarioDataPath(scenario).findActualDatabaseInitSQLFile("foo_db", 
databaseType);
+        Optional<String> logicDatabaseInitSQLFile = new 
ScenarioDataPath(scenario).findActualDatabaseInitSQLFile(DefaultDatabase.LOGIC_NAME,
 databaseType);
         if (!logicDatabaseInitSQLFile.isPresent()) {
             return;
         }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
index 53e13552ac9..b15284017af 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
@@ -32,7 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.algorithm.FixtureTransmissionJobItemContext;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -90,7 +90,7 @@ class PipelineDataSourceSinkTest {
     }
     
     private ImporterConfiguration mockImporterConfiguration() {
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"), 
Collections.singleton("user"));
+        Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new ShardingSphereIdentifier("test_table"), 
Collections.singleton("user"));
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index eb3b1038854..94ace243fbe 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -52,10 +52,10 @@ import 
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
@@ -210,8 +210,8 @@ public final class PipelineContextUtils {
     private static MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
         Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
+        Set<ShardingSphereIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet());
+        Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
         return new 
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
 createTableConfigs, incrementalDumperContext, importerConfig);
@@ -233,7 +233,7 @@ public final class PipelineContextUtils {
     }
     
     private static ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
-                                                                    final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+                                                                    final 
Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
         int retryTimes = jobConfig.getRetryTimes();

Reply via email to