This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 53c91af1cb1 Improve table records count calculation in pipeline job 
for MySQL (#24293)
53c91af1cb1 is described below

commit 53c91af1cb16d8c5e94354a38d03c54ee59c4daf
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Feb 22 17:14:07 2023 +0800

    Improve table records count calculation in pipeline job for MySQL (#24293)
    
    * Improve table records count calculation in pipeline job
    
    * Improve buildEstimateCountSQL
    
    * Refactor getTableRecordsCount
    
    * Fix ci
    
    * Fix unit test
    
    * Improve parameter name
---
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |  9 +++++
 .../core/prepare/InventoryTaskSplitter.java        | 40 ++++++++++++++++++----
 .../fixture/FixturePipelineSQLBuilder.java         |  5 +++
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |  6 ++++
 .../sqlbuilder/MySQLPipelineSQLBuilderTest.java    |  7 ++--
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |  6 ++++
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |  6 ++++
 .../core/fixture/FixturePipelineSQLBuilder.java    |  7 ++++
 8 files changed, 77 insertions(+), 9 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 9c042a8bd35..b7ebb6dad60 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -135,6 +135,15 @@ public interface PipelineSQLBuilder extends TypedSPI {
     // TODO keep it for now, it might be used later
     String buildCountSQL(String schemaName, String tableName);
     
+    /**
+     * Build estimated count SQL.
+     *
+     * @param schemaName schema name
+     * @param tableName table name
+     * @return estimated count sql
+     */
+    Optional<String> buildEstimatedCountSQL(String schemaName, String 
tableName);
+    
     /**
      * Build query all ordering SQL.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 090e131a527..84aec4aa43d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -54,6 +54,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Inventory data task splitter.
@@ -171,21 +172,48 @@ public final class InventoryTaskSplitter {
         PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
         String schemaName = dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName()));
         String actualTableName = dumperConfig.getActualTableName();
-        // TODO with a large amount of data, count the full table will have 
performance problem
-        String sql = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
+        PipelineSQLBuilder pipelineSQLBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
jobConfig.getSourceDatabaseType());
+        Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
+        try {
+            if (sql.isPresent()) {
+                long result = getEstimatedCount(dataSource, sql.get());
+                return result > 0 ? result : getCount(dataSource, 
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+            }
+            return getCount(dataSource, 
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+        } catch (final SQLException ex) {
+            String uniqueKey = dumperConfig.hasUniqueKey() ? 
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
+            throw new 
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), 
uniqueKey, ex);
+        }
+    }
+    
+    // TODO maybe need refactor after PostgreSQL support estimated count.
+    private long getEstimatedCount(final DataSource dataSource, final String 
estimatedCountSQL) throws SQLException {
         try (
                 Connection connection = dataSource.getConnection();
-                PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+                PreparedStatement preparedStatement = 
connection.prepareStatement(estimatedCountSQL)) {
+            preparedStatement.setString(1, connection.getCatalog());
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
                 resultSet.next();
                 return resultSet.getLong(1);
             }
-        } catch (final SQLException ex) {
-            String uniqueKey = dumperConfig.hasUniqueKey() ? 
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
-            throw new 
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(), 
uniqueKey, ex);
         }
     }
     
+    private long getCount(final DataSource dataSource, final String countSQL) 
throws SQLException {
+        long startTimeMillis = System.currentTimeMillis();
+        long result;
+        try (
+                Connection connection = dataSource.getConnection();
+                PreparedStatement preparedStatement = 
connection.prepareStatement(countSQL)) {
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                resultSet.next();
+                result = resultSet.getLong(1);
+            }
+        }
+        log.info("getCountSQLResult cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
+        return result;
+    }
+    
     private Collection<IngestPosition<?>> 
getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext 
jobItemContext, final DataSource dataSource,
                                                                              
final InventoryDumperConfiguration dumperConfig) {
         Collection<IngestPosition<?>> result = new LinkedList<>();
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 136723e894b..523a43e6b96 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -73,6 +73,11 @@ public final class FixturePipelineSQLBuilder implements 
PipelineSQLBuilder {
         return "";
     }
     
+    @Override
+    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
+        return Optional.empty();
+    }
+    
     @Override
     public String buildQueryAllOrderingSQL(final String schemaName, final 
String tableName, final String uniqueKey, final boolean firstQuery) {
         return "";
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 2007f458da1..195058b234f 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -89,6 +89,12 @@ public final class MySQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
         return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS 
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", quote(column), 
quote(tableName)));
     }
     
+    @Override
+    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
+        return Optional.of(String.format("SELECT TABLE_ROWS FROM 
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
+                getQualifiedTableName(schemaName, tableName)));
+    }
+    
     @Override
     public String getType() {
         return "MySQL";
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 026a2302c4c..8b8af46df7b 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -72,8 +72,9 @@ public final class MySQLPipelineSQLBuilderTest {
     }
     
     @Test
-    public void assertBuilderCountSQLWithoutKeyword() {
-        String actualCountSQL = sqlBuilder.buildCountSQL(null, "t_order");
-        assertThat(actualCountSQL, is("SELECT COUNT(*) FROM t_order"));
+    public void assertBuilderEstimateCountSQLWithoutKeyword() {
+        Optional<String> actual = sqlBuilder.buildEstimatedCountSQL(null, 
"t_order");
+        assertTrue(actual.isPresent());
+        assertThat(actual.get(), is("SELECT TABLE_ROWS FROM 
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = 't_order'"));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index ab1e1dea91b..332b89b3ea8 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -87,6 +87,12 @@ public final class OpenGaussPipelineSQLBuilder extends 
AbstractPipelineSQLBuilde
         return result.toString();
     }
     
+    @Override
+    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
+        // TODO Support estimated count later.
+        return Optional.empty();
+    }
+    
     @Override
     public String getType() {
         return "openGauss";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 16681849cee..3f9db6be99d 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -85,6 +85,12 @@ public final class PostgreSQLPipelineSQLBuilder extends 
AbstractPipelineSQLBuild
         return result.toString();
     }
     
+    @Override
+    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
+        // TODO Support estimated count later.
+        return Optional.empty();
+    }
+    
     @Override
     public String getType() {
         return "PostgreSQL";
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 0e85ed5c65c..98f3ce69d59 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -19,6 +19,8 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
 
+import java.util.Optional;
+
 public final class FixturePipelineSQLBuilder extends 
AbstractPipelineSQLBuilder {
     
     @Override
@@ -40,4 +42,9 @@ public final class FixturePipelineSQLBuilder extends 
AbstractPipelineSQLBuilder
     protected String getRightIdentifierQuoteString() {
         return "";
     }
+    
+    @Override
+    public Optional<String> buildEstimatedCountSQL(final String schemaName, 
final String tableName) {
+        return Optional.empty();
+    }
 }

Reply via email to