This is an automated email from the ASF dual-hosted git repository. panjuan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new d73212e Add pipeline table metadata and loader (#14424) d73212e is described below commit d73212e7a61555b83aaaf8fee69e829e8b49db00 Author: Hongsheng Zhong <sand...@126.com> AuthorDate: Wed Dec 29 20:28:57 2021 +0800 Add pipeline table metadata and loader (#14424) --- .../consistency/DataConsistencyCheckerImpl.java | 57 ++++++------- .../loader/PipelineTableMetaDataLoader.java | 96 ++++++++++++++++++++++ .../metadata/model/PipelineColumnMetaData.java | 58 +++++++++++++ .../core/metadata/model/PipelineTableMetaData.java | 71 ++++++++++++++++ 4 files changed, 254 insertions(+), 28 deletions(-) diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java index 3bc358a..87033f1 100644 --- a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java @@ -27,14 +27,14 @@ import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat import org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory; import org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException; +import org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader; +import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData; import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext; import org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm; import org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator; import org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm; import org.apache.shardingsphere.infra.database.type.DatabaseType; import org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder; -import org.apache.shardingsphere.infra.metadata.schema.builder.loader.common.TableMetaDataLoader; -import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData; import org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory; import javax.sql.DataSource; @@ -48,8 +48,8 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; @@ -66,6 +66,8 @@ import java.util.stream.Collectors; @Slf4j public final class DataConsistencyCheckerImpl implements DataConsistencyChecker { + private static final Map<PipelineDataSourceConfiguration, PipelineTableMetaDataLoader> TABLE_META_DATA_LOADER_MAP = new ConcurrentHashMap<>(); + private final PipelineDataSourceFactory dataSourceFactory = new PipelineDataSourceFactory(); // TODO replace to JobConfiguration @@ -127,13 +129,6 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker jobContext.getJobConfig().getPipelineConfig().getTarget().getType(), jobContext.getJobConfig().getPipelineConfig().getTarget().getParameter()); checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, targetDataSourceConfig.getDatabaseType().getName()); Collection<String> logicTableNames = jobContext.getTaskConfigs().stream().flatMap(each -> each.getDumperConfig().getTableNameMap().values().stream()).distinct().collect(Collectors.toList()); - Map<String, TableMetaData> tableMetaDataMap = getTablesColumnsMap(sourceDataSourceConfig, logicTableNames); - logicTableNames.forEach(each -> { - //TODO put to preparer - if (!tableMetaDataMap.containsKey(each)) { - throw new PipelineDataConsistencyCheckFailedException(String.format("could not get columns for table '%s'", each)); - } - }); String sourceDatabaseType = sourceDataSourceConfig.getDatabaseType().getName(); String targetDatabaseType = targetDataSourceConfig.getDatabaseType().getName(); SingleTableDataCalculator sourceCalculator = checkAlgorithm.getSingleTableDataCalculator(sourceDatabaseType); @@ -144,9 +139,17 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker JobRateLimitAlgorithm rateLimitAlgorithm = jobContext.getRuleAlteredContext().getRateLimitAlgorithm(); try (PipelineDataSourceWrapper sourceDataSource = dataSourceFactory.newInstance(sourceDataSourceConfig); PipelineDataSourceWrapper targetDataSource = dataSourceFactory.newInstance(targetDataSourceConfig)) { + PipelineTableMetaDataLoader tableMetaDataLoader = getTableMetaDataLoader(sourceDataSourceConfig, sourceDataSource); + logicTableNames.forEach(each -> { + //TODO put to preparer + if (null == tableMetaDataLoader.getTableMetaData(each)) { + throw new PipelineDataConsistencyCheckFailedException(String.format("could not get metadata for table '%s'", each)); + } + }); for (String each : logicTableNames) { - Collection<String> columnNames = tableMetaDataMap.get(each).getColumns().keySet(); - String uniqueKey = tableMetaDataMap.get(each).getPrimaryKeyColumns().get(0); + PipelineTableMetaData tableMetaData = tableMetaDataLoader.getTableMetaData(each); + Collection<String> columnNames = tableMetaData.getColumnNames(); + String uniqueKey = tableMetaData.getPrimaryKeys().get(0); DataCalculateParameter sourceCalculateParameter = DataCalculateParameter.builder().dataSource(sourceDataSource).databaseType(sourceDatabaseType).peerDatabaseType(targetDatabaseType) .logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build(); DataCalculateParameter targetCalculateParameter = DataCalculateParameter.builder().dataSource(targetDataSource).databaseType(targetDatabaseType).peerDatabaseType(sourceDatabaseType) @@ -170,7 +173,7 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker result.put(each, calculateResultsEquals); } } catch (final ExecutionException | InterruptedException | SQLException ex) { - throw new PipelineDataConsistencyCheckFailedException("data check failed"); + throw new PipelineDataConsistencyCheckFailedException("data check failed", ex); } finally { executor.shutdown(); executor.shutdownNow(); @@ -184,23 +187,21 @@ public final class DataConsistencyCheckerImpl implements DataConsistencyChecker } } - // TODO reuse metadata - private Map<String, TableMetaData> getTablesColumnsMap(final PipelineDataSourceConfiguration dataSourceConfig, final Collection<String> tableNames) { - try (PipelineDataSourceWrapper dataSource = dataSourceFactory.newInstance(dataSourceConfig)) { - Map<String, TableMetaData> result = new LinkedHashMap<>(); - for (String each : tableNames) { - Optional<TableMetaData> tableMetaDataOptional; - try { - tableMetaDataOptional = TableMetaDataLoader.load(dataSource, each, dataSourceConfig.getDatabaseType()); - } catch (final SQLException ex) { - throw new PipelineDataConsistencyCheckFailedException(String.format("get columns failed for table '%s'", each), ex); - } - TableMetaData tableMetaData = tableMetaDataOptional.orElseThrow(() -> new PipelineDataConsistencyCheckFailedException(String.format("get metadata failed for table '%s'", each))); - result.put(each, tableMetaData); + private PipelineTableMetaDataLoader getTableMetaDataLoader(final PipelineDataSourceConfiguration sourceDataSourceConfig, final PipelineDataSourceWrapper sourceDataSource) throws SQLException { + PipelineTableMetaDataLoader result = TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig); + if (null != result) { + return result; + } + synchronized (TABLE_META_DATA_LOADER_MAP) { + result = TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig); + if (null != result) { + return result; + } + try (Connection connection = sourceDataSource.getConnection()) { + result = new PipelineTableMetaDataLoader(connection, "%"); + TABLE_META_DATA_LOADER_MAP.put(sourceDataSourceConfig, result); } return result; - } catch (final SQLException ex) { - throw new PipelineDataConsistencyCheckFailedException("create data source failed", ex); } } } diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java new file mode 100644 index 0000000..0ced9ce --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.metadata.loader; + +import lombok.extern.slf4j.Slf4j; +import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; +import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +/** + * Pipeline table meta data loader. + */ +@Slf4j +public final class PipelineTableMetaDataLoader { + + private final Map<String, PipelineTableMetaData> tableMetaDataMap; + + public PipelineTableMetaDataLoader(final Connection connection, final String tableNamePattern) throws SQLException { + this.tableMetaDataMap = loadTableMetadataMap(connection, tableNamePattern); + } + + private Map<String, PipelineTableMetaData> loadTableMetadataMap(final Connection connection, final String tableNamePattern) throws SQLException { + Map<String, Map<String, PipelineColumnMetaData>> tablePipelineColumnMetaDataMap = new LinkedHashMap<>(); + // TODO if tableNamePattern is '%', it might return inconsistent result, actual table `t_order_2` may be return + try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), null, tableNamePattern, "%")) { + while (resultSet.next()) { + String tableName = resultSet.getString("TABLE_NAME"); + Map<String, PipelineColumnMetaData> columnMetaDataMap = tablePipelineColumnMetaDataMap.computeIfAbsent(tableName, k -> new LinkedHashMap<>()); + String columnName = resultSet.getString("COLUMN_NAME"); + if (columnMetaDataMap.containsKey(columnName)) { + continue; + } + int dataType = resultSet.getInt("DATA_TYPE"); + Set<String> primaryKeys; + try { + primaryKeys = loadPrimaryKeys(connection, tableName); + } catch (final SQLException ex) { + log.error("loadPrimaryKeys failed, tableName={}", tableName); + throw ex; + } + boolean primaryKey = primaryKeys.contains(columnName); + PipelineColumnMetaData columnMetaData = new PipelineColumnMetaData(columnName, dataType, primaryKey); + columnMetaDataMap.put(columnName, columnMetaData); + } + } + Map<String, PipelineTableMetaData> result = new LinkedHashMap<>(); + for (Entry<String, Map<String, PipelineColumnMetaData>> entry : tablePipelineColumnMetaDataMap.entrySet()) { + result.put(entry.getKey(), new PipelineTableMetaData(entry.getKey(), entry.getValue())); + } + log.info("loadTableMetadataMap, result={}", result); + return result; + } + + private Set<String> loadPrimaryKeys(final Connection connection, final String tableName) throws SQLException { + Set<String> result = new LinkedHashSet<>(); + try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), connection.getSchema(), tableName)) { + while (resultSet.next()) { + result.add(resultSet.getString("COLUMN_NAME")); + } + } + return result; + } + + /** + * Get table metadata. + * + * @param tableName table name + * @return table metadata + */ + public PipelineTableMetaData getTableMetaData(final String tableName) { + return tableMetaDataMap.get(tableName); + } +} diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java new file mode 100644 index 0000000..7f45ad4 --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.metadata.model; + +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.util.Objects; + +/** + * Column meta data. + */ +@RequiredArgsConstructor +@Getter +@ToString +public final class PipelineColumnMetaData { + + @NonNull + private final String name; + + private final int dataType; + + private final boolean primaryKey; + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PipelineColumnMetaData that = (PipelineColumnMetaData) o; + return name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } +} diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java new file mode 100644 index 0000000..619af85 --- /dev/null +++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.data.pipeline.core.metadata.model; + +import lombok.Getter; +import lombok.NonNull; +import lombok.ToString; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Pipelien table meta data. + */ +@ToString +public final class PipelineTableMetaData { + + @NonNull + private final String name; + + private final Map<String, PipelineColumnMetaData> columnMetaDataMap; + + @Getter + private final Collection<String> columnNames; + + @Getter + private final List<String> primaryKeys; + + public PipelineTableMetaData(final String name, final Map<String, PipelineColumnMetaData> columnMetaDataMap) { + this.name = name; + this.columnMetaDataMap = columnMetaDataMap; + columnNames = Collections.unmodifiableCollection(columnMetaDataMap.keySet()); + primaryKeys = Collections.unmodifiableList(columnMetaDataMap.values().stream().filter(PipelineColumnMetaData::isPrimaryKey).map(PipelineColumnMetaData::getName).collect(Collectors.toList())); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PipelineTableMetaData that = (PipelineTableMetaData) o; + return name.equals(that.name); + } + + @Override + public int hashCode() { + return Objects.hash(name); + } +}