This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new d73212e  Add pipeline table metadata and loader (#14424)
d73212e is described below

commit d73212e7a61555b83aaaf8fee69e829e8b49db00
Author: Hongsheng Zhong <sand...@126.com>
AuthorDate: Wed Dec 29 20:28:57 2021 +0800

    Add pipeline table metadata and loader (#14424)
---
 .../consistency/DataConsistencyCheckerImpl.java    | 57 ++++++-------
 .../loader/PipelineTableMetaDataLoader.java        | 96 ++++++++++++++++++++++
 .../metadata/model/PipelineColumnMetaData.java     | 58 +++++++++++++
 .../core/metadata/model/PipelineTableMetaData.java | 71 ++++++++++++++++
 4 files changed, 254 insertions(+), 28 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
index 3bc358a..87033f1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerImpl.java
@@ -27,14 +27,14 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCheckAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.SingleTableDataCalculator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
-import 
org.apache.shardingsphere.infra.metadata.schema.builder.loader.common.TableMetaDataLoader;
-import org.apache.shardingsphere.infra.metadata.schema.model.TableMetaData;
 import 
org.apache.shardingsphere.scaling.core.job.sqlbuilder.ScalingSQLBuilderFactory;
 
 import javax.sql.DataSource;
@@ -48,8 +48,8 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
@@ -66,6 +66,8 @@ import java.util.stream.Collectors;
 @Slf4j
 public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker {
     
+    private static final Map<PipelineDataSourceConfiguration, 
PipelineTableMetaDataLoader> TABLE_META_DATA_LOADER_MAP = new 
ConcurrentHashMap<>();
+    
     private final PipelineDataSourceFactory dataSourceFactory = new 
PipelineDataSourceFactory();
     
     // TODO replace to JobConfiguration
@@ -127,13 +129,6 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
                 
jobContext.getJobConfig().getPipelineConfig().getTarget().getType(), 
jobContext.getJobConfig().getPipelineConfig().getTarget().getParameter());
         checkDatabaseTypeSupportedOrNot(supportedDatabaseTypes, 
targetDataSourceConfig.getDatabaseType().getName());
         Collection<String> logicTableNames = 
jobContext.getTaskConfigs().stream().flatMap(each -> 
each.getDumperConfig().getTableNameMap().values().stream()).distinct().collect(Collectors.toList());
-        Map<String, TableMetaData> tableMetaDataMap = 
getTablesColumnsMap(sourceDataSourceConfig, logicTableNames);
-        logicTableNames.forEach(each -> {
-            //TODO put to preparer
-            if (!tableMetaDataMap.containsKey(each)) {
-                throw new 
PipelineDataConsistencyCheckFailedException(String.format("could not get 
columns for table '%s'", each));
-            }
-        });
         String sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType().getName();
         String targetDatabaseType = 
targetDataSourceConfig.getDatabaseType().getName();
         SingleTableDataCalculator sourceCalculator = 
checkAlgorithm.getSingleTableDataCalculator(sourceDatabaseType);
@@ -144,9 +139,17 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
         JobRateLimitAlgorithm rateLimitAlgorithm = 
jobContext.getRuleAlteredContext().getRateLimitAlgorithm();
         try (PipelineDataSourceWrapper sourceDataSource = 
dataSourceFactory.newInstance(sourceDataSourceConfig);
              PipelineDataSourceWrapper targetDataSource = 
dataSourceFactory.newInstance(targetDataSourceConfig)) {
+            PipelineTableMetaDataLoader tableMetaDataLoader = 
getTableMetaDataLoader(sourceDataSourceConfig, sourceDataSource);
+            logicTableNames.forEach(each -> {
+                //TODO put to preparer
+                if (null == tableMetaDataLoader.getTableMetaData(each)) {
+                    throw new 
PipelineDataConsistencyCheckFailedException(String.format("could not get 
metadata for table '%s'", each));
+                }
+            });
             for (String each : logicTableNames) {
-                Collection<String> columnNames = 
tableMetaDataMap.get(each).getColumns().keySet();
-                String uniqueKey = 
tableMetaDataMap.get(each).getPrimaryKeyColumns().get(0);
+                PipelineTableMetaData tableMetaData = 
tableMetaDataLoader.getTableMetaData(each);
+                Collection<String> columnNames = 
tableMetaData.getColumnNames();
+                String uniqueKey = tableMetaData.getPrimaryKeys().get(0);
                 DataCalculateParameter sourceCalculateParameter = 
DataCalculateParameter.builder().dataSource(sourceDataSource).databaseType(sourceDatabaseType).peerDatabaseType(targetDatabaseType)
                     
.logicTableName(each).columnNames(columnNames).uniqueKey(uniqueKey).build();
                 DataCalculateParameter targetCalculateParameter = 
DataCalculateParameter.builder().dataSource(targetDataSource).databaseType(targetDatabaseType).peerDatabaseType(sourceDatabaseType)
@@ -170,7 +173,7 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
                 result.put(each, calculateResultsEquals);
             }
         } catch (final ExecutionException | InterruptedException | 
SQLException ex) {
-            throw new PipelineDataConsistencyCheckFailedException("data check 
failed");
+            throw new PipelineDataConsistencyCheckFailedException("data check 
failed", ex);
         } finally {
             executor.shutdown();
             executor.shutdownNow();
@@ -184,23 +187,21 @@ public final class DataConsistencyCheckerImpl implements 
DataConsistencyChecker
         }
     }
     
-    // TODO reuse metadata
-    private Map<String, TableMetaData> getTablesColumnsMap(final 
PipelineDataSourceConfiguration dataSourceConfig, final Collection<String> 
tableNames) {
-        try (PipelineDataSourceWrapper dataSource = 
dataSourceFactory.newInstance(dataSourceConfig)) {
-            Map<String, TableMetaData> result = new LinkedHashMap<>();
-            for (String each : tableNames) {
-                Optional<TableMetaData> tableMetaDataOptional;
-                try {
-                    tableMetaDataOptional = 
TableMetaDataLoader.load(dataSource, each, dataSourceConfig.getDatabaseType());
-                } catch (final SQLException ex) {
-                    throw new 
PipelineDataConsistencyCheckFailedException(String.format("get columns failed 
for table '%s'", each), ex);
-                }
-                TableMetaData tableMetaData = 
tableMetaDataOptional.orElseThrow(() -> new 
PipelineDataConsistencyCheckFailedException(String.format("get metadata failed 
for table '%s'", each)));
-                result.put(each, tableMetaData);
+    private PipelineTableMetaDataLoader getTableMetaDataLoader(final 
PipelineDataSourceConfiguration sourceDataSourceConfig, final 
PipelineDataSourceWrapper sourceDataSource) throws SQLException {
+        PipelineTableMetaDataLoader result = 
TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig);
+        if (null != result) {
+            return result;
+        }
+        synchronized (TABLE_META_DATA_LOADER_MAP) {
+            result = TABLE_META_DATA_LOADER_MAP.get(sourceDataSourceConfig);
+            if (null != result) {
+                return result;
+            }
+            try (Connection connection = sourceDataSource.getConnection()) {
+                result = new PipelineTableMetaDataLoader(connection, "%");
+                TABLE_META_DATA_LOADER_MAP.put(sourceDataSourceConfig, result);
             }
             return result;
-        } catch (final SQLException ex) {
-            throw new PipelineDataConsistencyCheckFailedException("create data 
source failed", ex);
         }
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
new file mode 100644
index 0000000..0ced9ce
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * Pipeline table meta data loader.
+ */
+@Slf4j
+public final class PipelineTableMetaDataLoader {
+    
+    private final Map<String, PipelineTableMetaData> tableMetaDataMap;
+    
+    public PipelineTableMetaDataLoader(final Connection connection, final 
String tableNamePattern) throws SQLException {
+        this.tableMetaDataMap = loadTableMetadataMap(connection, 
tableNamePattern);
+    }
+    
+    private Map<String, PipelineTableMetaData> loadTableMetadataMap(final 
Connection connection, final String tableNamePattern) throws SQLException {
+        Map<String, Map<String, PipelineColumnMetaData>> 
tablePipelineColumnMetaDataMap = new LinkedHashMap<>();
+        // TODO if tableNamePattern is '%', it might return inconsistent 
result, actual table `t_order_2` may be return
+        try (ResultSet resultSet = 
connection.getMetaData().getColumns(connection.getCatalog(), null, 
tableNamePattern, "%")) {
+            while (resultSet.next()) {
+                String tableName = resultSet.getString("TABLE_NAME");
+                Map<String, PipelineColumnMetaData> columnMetaDataMap = 
tablePipelineColumnMetaDataMap.computeIfAbsent(tableName, k -> new 
LinkedHashMap<>());
+                String columnName = resultSet.getString("COLUMN_NAME");
+                if (columnMetaDataMap.containsKey(columnName)) {
+                    continue;
+                }
+                int dataType = resultSet.getInt("DATA_TYPE");
+                Set<String> primaryKeys;
+                try {
+                    primaryKeys = loadPrimaryKeys(connection, tableName);
+                } catch (final SQLException ex) {
+                    log.error("loadPrimaryKeys failed, tableName={}", 
tableName);
+                    throw ex;
+                }
+                boolean primaryKey = primaryKeys.contains(columnName);
+                PipelineColumnMetaData columnMetaData = new 
PipelineColumnMetaData(columnName, dataType, primaryKey);
+                columnMetaDataMap.put(columnName, columnMetaData);
+            }
+        }
+        Map<String, PipelineTableMetaData> result = new LinkedHashMap<>();
+        for (Entry<String, Map<String, PipelineColumnMetaData>> entry : 
tablePipelineColumnMetaDataMap.entrySet()) {
+            result.put(entry.getKey(), new 
PipelineTableMetaData(entry.getKey(), entry.getValue()));
+        }
+        log.info("loadTableMetadataMap, result={}", result);
+        return result;
+    }
+    
+    private Set<String> loadPrimaryKeys(final Connection connection, final 
String tableName) throws SQLException {
+        Set<String> result = new LinkedHashSet<>();
+        try (ResultSet resultSet = 
connection.getMetaData().getPrimaryKeys(connection.getCatalog(), 
connection.getSchema(), tableName)) {
+            while (resultSet.next()) {
+                result.add(resultSet.getString("COLUMN_NAME"));
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * Get table metadata.
+     *
+     * @param tableName table name
+     * @return table metadata
+     */
+    public PipelineTableMetaData getTableMetaData(final String tableName) {
+        return tableMetaDataMap.get(tableName);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
new file mode 100644
index 0000000..7f45ad4
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineColumnMetaData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import java.util.Objects;
+
+/**
+ * Column meta data.
+ */
+@RequiredArgsConstructor
+@Getter
+@ToString
+public final class PipelineColumnMetaData {
+    
+    @NonNull
+    private final String name;
+    
+    private final int dataType;
+    
+    private final boolean primaryKey;
+    
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final PipelineColumnMetaData that = (PipelineColumnMetaData) o;
+        return name.equals(that.name);
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(name);
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
new file mode 100644
index 0000000..619af85
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.ToString;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Pipelien table meta data.
+ */
+@ToString
+public final class PipelineTableMetaData {
+    
+    @NonNull
+    private final String name;
+    
+    private final Map<String, PipelineColumnMetaData> columnMetaDataMap;
+    
+    @Getter
+    private final Collection<String> columnNames;
+    
+    @Getter
+    private final List<String> primaryKeys;
+    
+    public PipelineTableMetaData(final String name, final Map<String, 
PipelineColumnMetaData> columnMetaDataMap) {
+        this.name = name;
+        this.columnMetaDataMap = columnMetaDataMap;
+        columnNames = 
Collections.unmodifiableCollection(columnMetaDataMap.keySet());
+        primaryKeys = 
Collections.unmodifiableList(columnMetaDataMap.values().stream().filter(PipelineColumnMetaData::isPrimaryKey).map(PipelineColumnMetaData::getName).collect(Collectors.toList()));
+    }
+    
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final PipelineTableMetaData that = (PipelineTableMetaData) o;
+        return name.equals(that.name);
+    }
+    
+    @Override
+    public int hashCode() {
+        return Objects.hash(name);
+    }
+}

Reply via email to