This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new dbf81f2fd73 Rename TableNameSchemaNameMapping to
TableAndSchemaNameMapper (#28939)
dbf81f2fd73 is described below
commit dbf81f2fd738060246e75036ed524b6ac172112e
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 4 19:00:57 2023 +0800
Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper (#28939)
* Refactor ImporterConfiguration.findSchemaName()
* Add more constructor on TableNameSchemaNameMapping
* Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper
---
...eMapping.java => TableAndSchemaNameMapper.java} | 13 +++++++---
.../ingest/dumper/context/DumperCommonContext.java | 10 ++++----
.../dumper/context/InventoryDumperContext.java | 2 +-
.../context/TableNameSchemaNameMappingTest.java | 13 +++++++---
.../common/config/ImporterConfiguration.java | 15 +++++------
.../core/importer/sink/PipelineDataSourceSink.java | 11 +++-----
.../core/preparer/PipelineJobPreparerUtils.java | 2 +-
.../preparer/datasource/DataSourceCheckEngine.java | 8 +++---
.../datasource/DataSourceCheckEngineTest.java | 6 ++---
.../mysql/ingest/MySQLIncrementalDumperTest.java | 4 +--
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 4 +--
.../ingest/wal/WALEventConverterTest.java | 4 +--
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 29 +++++++---------------
.../migration/api/impl/MigrationJobAPI.java | 16 ++++++------
.../MigrationIncrementalDumperContextCreator.java | 10 ++++----
.../core/importer/PipelineDataSourceSinkTest.java | 4 +--
16 files changed, 75 insertions(+), 76 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
similarity index 80%
rename from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java
rename to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
index ea56984a4b2..3999329ae63 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
@@ -20,23 +20,30 @@ package org.apache.shardingsphere.data.pipeline.api.context;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
/**
- * Table name and schema name mapping.
+ * Table and schema name mapper.
*/
@ToString
-public final class TableNameSchemaNameMapping {
+public final class TableAndSchemaNameMapper {
private final Map<LogicTableName, String> mapping;
- public TableNameSchemaNameMapping(final Map<String, String>
tableSchemaMap) {
+ public TableAndSchemaNameMapper(final Map<String, String> tableSchemaMap) {
mapping = null == tableSchemaMap ? Collections.emptyMap() :
getLogicTableNameMap(tableSchemaMap);
}
+ public TableAndSchemaNameMapper(final Collection<String> tableNames) {
+ Map<String, String> tableNameSchemaMap = tableNames.stream().map(each
-> each.split("\\.")).filter(split -> split.length >
1).collect(Collectors.toMap(split -> split[1], split -> split[0]));
+ mapping = getLogicTableNameMap(tableNameSchemaMap);
+ }
+
private Map<LogicTableName, String> getLogicTableNameMap(final Map<String,
String> tableSchemaMap) {
Map<LogicTableName, String> result = new
HashMap<>(tableSchemaMap.size(), 1F);
for (Entry<String, String> entry : tableSchemaMap.entrySet()) {
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index 69491ddcc9a..4a640fdb7bf 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -33,7 +33,7 @@ import java.util.Map;
*/
@Getter
@Setter
-@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
+@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
public abstract class DumperCommonContext {
private String dataSourceName;
@@ -42,7 +42,7 @@ public abstract class DumperCommonContext {
private Map<ActualTableName, LogicTableName> tableNameMap;
- private TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ private TableAndSchemaNameMapper tableAndSchemaNameMapper;
private IngestPosition position;
@@ -77,7 +77,7 @@ public abstract class DumperCommonContext {
* @return schema name. nullable
*/
public String getSchemaName(final LogicTableName logicTableName) {
- return tableNameSchemaNameMapping.getSchemaName(logicTableName);
+ return tableAndSchemaNameMapper.getSchemaName(logicTableName);
}
/**
@@ -87,6 +87,6 @@ public abstract class DumperCommonContext {
* @return schema name, can be nullable
*/
public String getSchemaName(final ActualTableName actualTableName) {
- return
tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
+ return
tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName));
}
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index 45eaf8e1ebe..d72f07c7eaf 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -55,7 +55,7 @@ public final class InventoryDumperContext extends
DumperCommonContext {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
setTableNameMap(dumperContext.getTableNameMap());
-
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
+
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
}
/**
diff --git
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
index a640ee35df0..9eb21b6148c 100644
---
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
+++
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
@@ -19,7 +19,9 @@ package org.apache.shardingsphere.data.pipeline.api.context;
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,16 +32,21 @@ class TableNameSchemaNameMappingTest {
@Test
void assertConstructFromNull() {
- assertDoesNotThrow(() -> new TableNameSchemaNameMapping(null));
+ assertDoesNotThrow(() -> new TableAndSchemaNameMapper((Map<String,
String>) null));
}
@Test
void assertConstructFromValueNullMap() {
- assertNull(new
TableNameSchemaNameMapping(Collections.singletonMap("t_order",
null)).getSchemaName("t_order"));
+ assertNull(new
TableAndSchemaNameMapper(Collections.singletonMap("t_order",
null)).getSchemaName("t_order"));
}
@Test
void assertConstructFromMap() {
- assertThat(new
TableNameSchemaNameMapping(Collections.singletonMap("t_order",
"public")).getSchemaName("t_order"), is("public"));
+ assertThat(new
TableAndSchemaNameMapper(Collections.singletonMap("t_order",
"public")).getSchemaName("t_order"), is("public"));
+ }
+
+ @Test
+ void assertConstructFromCollection() {
+ assertThat(new
TableAndSchemaNameMapper(Arrays.asList("public.t_order",
"t_order_item")).getSchemaName("t_order"), is("public"));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
index a92bce38304..0116904291a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.common.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -30,6 +30,7 @@ import
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -38,7 +39,7 @@ import java.util.stream.Collectors;
*/
@RequiredArgsConstructor
@Getter
-@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
+@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
public final class ImporterConfiguration {
private final PipelineDataSourceConfiguration dataSourceConfig;
@@ -46,7 +47,7 @@ public final class ImporterConfiguration {
// TODO columnName case-insensitive?
private final Map<LogicTableName, Set<String>> shardingColumnsMap;
- private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+ private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
private final int batchSize;
@@ -76,13 +77,13 @@ public final class ImporterConfiguration {
}
/**
- * Get schema name.
+ * Find schema name.
*
* @param logicTableName logic table name
- * @return schema name. nullable
+ * @return schema name
*/
- public String getSchemaName(final LogicTableName logicTableName) {
+ public Optional<String> findSchemaName(final String logicTableName) {
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
- return dialectDatabaseMetaData.isSchemaAvailable() ?
tableNameSchemaNameMapping.getSchemaName(logicTableName) : null;
+ return dialectDatabaseMetaData.isSchemaAvailable() ?
Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) :
Optional.empty();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 0995292e0db..fe3134354bd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
@@ -169,7 +168,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private void executeBatchInsert(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
- String insertSql =
importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()),
dataRecord);
+ String insertSql =
importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord);
try (PreparedStatement preparedStatement =
connection.prepareStatement(insertSql)) {
batchInsertStatement.set(preparedStatement);
preparedStatement.setQueryTimeout(30);
@@ -185,10 +184,6 @@ public final class PipelineDataSourceSink implements
PipelineSink {
}
}
- private String getSchemaName(final String logicTableName) {
- return getImporterConfig().getSchemaName(new
LogicTableName(logicTableName));
- }
-
private void executeUpdate(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
for (DataRecord each : dataRecords) {
executeUpdate(connection, each);
@@ -199,7 +194,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
Set<String> shardingColumns =
importerConfig.getShardingColumns(dataRecord.getTableName());
List<Column> conditionColumns =
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
List<Column> setColumns =
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
- String updateSql =
importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()),
dataRecord, conditionColumns);
+ String updateSql =
importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord, conditionColumns);
try (PreparedStatement preparedStatement =
connection.prepareStatement(updateSql)) {
updateStatement.set(preparedStatement);
for (int i = 0; i < setColumns.size(); i++) {
@@ -226,7 +221,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
private void executeBatchDelete(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
DataRecord dataRecord = dataRecords.get(0);
- String deleteSQL =
importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()),
dataRecord,
+ String deleteSQL =
importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
dataRecord,
RecordUtils.extractConditionColumns(dataRecord,
importerConfig.getShardingColumns(dataRecord.getTableName())));
try (PreparedStatement preparedStatement =
connection.prepareStatement(deleteSQL)) {
batchDeleteStatement.set(preparedStatement);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index e649878a0c7..2f38ea4c72c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -166,7 +166,7 @@ public final class PipelineJobPreparerUtils {
}
DataSourceCheckEngine dataSourceCheckEngine = new
DataSourceCheckEngine(databaseType);
dataSourceCheckEngine.checkConnection(targetDataSources);
- dataSourceCheckEngine.checkTargetTable(targetDataSources,
importerConfig.getTableNameSchemaNameMapping(),
importerConfig.getLogicTableNames());
+ dataSourceCheckEngine.checkTargetTable(targetDataSources,
importerConfig.getTableAndSchemaNameMapper(),
importerConfig.getLogicTableNames());
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
index f58f89bb8f4..7b76c512c85 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
@@ -66,17 +66,17 @@ public final class DataSourceCheckEngine {
* Check table is empty.
*
* @param dataSources data sources
- * @param tableNameSchemaNameMapping mapping
+ * @param tableAndSchemaNameMapper mapping
* @param logicTableNames logic table names
* @throws PrepareJobWithInvalidConnectionException prepare job with
invalid connection exception
*/
// TODO rename to common usage name
// TODO Merge schemaName and tableNames
- public void checkTargetTable(final Collection<? extends DataSource>
dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final
Collection<String> logicTableNames) {
+ public void checkTargetTable(final Collection<? extends DataSource>
dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final
Collection<String> logicTableNames) {
try {
for (DataSource each : dataSources) {
for (String tableName : logicTableNames) {
- if (!checkEmpty(each,
tableNameSchemaNameMapping.getSchemaName(tableName), tableName)) {
+ if (!checkEmpty(each,
tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) {
throw new
PrepareJobWithTargetTableNotEmptyException(tableName);
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index 6922b6642ae..d74115d295d 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -85,7 +85,7 @@ class DataSourceCheckEngineTest {
when(dataSource.getConnection()).thenReturn(connection);
when(connection.prepareStatement("SELECT * FROM t_order LIMIT
1")).thenReturn(preparedStatement);
when(preparedStatement.executeQuery()).thenReturn(resultSet);
- dataSourceCheckEngine.checkTargetTable(dataSources, new
TableNameSchemaNameMapping(Collections.emptyMap()),
Collections.singletonList("t_order"));
+ dataSourceCheckEngine.checkTargetTable(dataSources, new
TableAndSchemaNameMapper(Collections.emptyMap()),
Collections.singletonList("t_order"));
}
@Test
@@ -95,6 +95,6 @@ class DataSourceCheckEngineTest {
when(preparedStatement.executeQuery()).thenReturn(resultSet);
when(resultSet.next()).thenReturn(true);
assertThrows(PrepareJobWithTargetTableNotEmptyException.class,
- () -> dataSourceCheckEngine.checkTargetTable(dataSources, new
TableNameSchemaNameMapping(Collections.emptyMap()),
Collections.singletonList("t_order")));
+ () -> dataSourceCheckEngine.checkTargetTable(dataSources, new
TableAndSchemaNameMapper(Collections.emptyMap()),
Collections.singletonList("t_order")));
}
}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 876dd1beacf..25bbaa33ff1 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -92,7 +92,7 @@ class MySQLIncrementalDumperTest {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
- result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(Collections.emptyMap()));
+ result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index c177dbb5963..e59c6a0f353 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -108,7 +108,7 @@ class PostgreSQLWALDumperTest {
result.setJobId("0101123456");
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order")));
- result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(Collections.emptyMap()));
+ result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 4d5f9d35ffd..93a4e721074 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -87,7 +87,7 @@ class WALEventConverterTest {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
- result.setTableNameSchemaNameMapping(new
TableNameSchemaNameMapping(Collections.emptyMap()));
+ result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 23035491e14..d6015da0a5e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -21,7 +21,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -177,7 +177,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
if (getJobItemProgress(jobId, i).isPresent()) {
continue;
}
- IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i,
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
+ IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress =
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
jobId, i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
@@ -267,26 +267,15 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
@Override
public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
- TableNameSchemaNameMapping tableNameSchemaNameMapping =
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
- IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableNameSchemaNameMapping);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping);
+ TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
+ IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableAndSchemaNameMapper);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext,
importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
return result;
}
- private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final
Collection<String> tableNames) {
- Map<String, String> tableNameSchemaMap = new LinkedHashMap<>();
- for (String each : tableNames) {
- String[] split = each.split("\\.");
- if (split.length > 1) {
- tableNameSchemaMap.put(split[1], split[0]);
- }
- }
- return new TableNameSchemaNameMapping(tableNameSchemaMap);
- }
-
- private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
dataNodeLine.getEntries().forEach(each ->
each.getDataNodes().forEach(node -> tableNameMap.put(new
ActualTableName(node.getTableName()), new
LogicTableName(each.getLogicTableName()))));
@@ -297,13 +286,13 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(actualDataSourceConfig);
result.setTableNameMap(tableNameMap);
- result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+ result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
result.setDecodeWithTX(jobConfig.isDecodeWithTX());
return result;
}
private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig, final Collection<String> schemaTableNames,
- final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
CDCProcessContext processContext = new
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
@@ -311,7 +300,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
Map<LogicTableName, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1);
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 2c74e4a0d1a..218d70427d9 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -264,21 +264,21 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
- CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
incrementalDumperContext.getTableNameSchemaNameMapping());
+ CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig,
incrementalDumperContext.getTableAndSchemaNameMapper());
Set<LogicTableName> targetTableNames =
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
Map<LogicTableName, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, incrementalDumperContext.getTableNameSchemaNameMapping());
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, incrementalDumperContext.getTableAndSchemaNameMapper());
MigrationTaskConfiguration result = new
MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(),
createTableConfig, incrementalDumperContext, importerConfig);
log.info("buildTaskConfiguration, result={}", result);
return result;
}
private CreateTableConfiguration buildCreateTableConfiguration(final
MigrationJobConfiguration jobConfig,
- final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
Collection<CreateTableEntry> createTableEntries = new LinkedList<>();
for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
- String sourceSchemaName =
tableNameSchemaNameMapping.getSchemaName(each.getLogicTableName());
+ String sourceSchemaName =
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
String targetSchemaName =
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
DataNode dataNode = each.getDataNodes().get(0);
@@ -294,13 +294,13 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ final
Map<LogicTableName, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
int retryTimes = jobConfig.getRetryTimes();
int concurrency = jobConfig.getConcurrency();
- return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableNameSchemaNameMapping, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
+ return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
}
@Override
@@ -372,7 +372,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
- TableNameSchemaNameMapping mapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
+ TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
Connection connection = dataSource.getConnection()) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index c63c5243c52..06ee39a408b 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -41,19 +41,19 @@ public final class MigrationIncrementalDumperContextCreator
implements Increment
@Override
public IncrementalDumperContext createDumperContext(final JobDataNodeLine
jobDataNodeLine) {
Map<ActualTableName, LogicTableName> tableNameMap =
JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
- TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
+ TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
String dataSourceName =
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
- return buildDumperContext(jobConfig.getJobId(), dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMap,
tableNameSchemaNameMapping);
+ return buildDumperContext(jobConfig.getJobId(), dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMap,
tableAndSchemaNameMapper);
}
private IncrementalDumperContext buildDumperContext(final String jobId,
final String dataSourceName, final PipelineDataSourceConfiguration
sourceDataSource,
- final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+ final
Map<ActualTableName, LogicTableName> tableNameMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setJobId(jobId);
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(sourceDataSource);
result.setTableNameMap(tableNameMap);
- result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+ result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
return result;
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index cbeec358b0c..487df1e7d48 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.importer;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
@@ -193,6 +193,6 @@ class PipelineDataSourceSinkTest {
private ImporterConfiguration mockImporterConfiguration() {
Map<LogicTableName, Set<String>> shardingColumnsMap =
Collections.singletonMap(new LogicTableName("test_table"),
Collections.singleton("user"));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, null, 3, 3);
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
}
}