This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 53c91af1cb1 Improve table records count calculation in pipeline job
for MySQL (#24293)
53c91af1cb1 is described below
commit 53c91af1cb16d8c5e94354a38d03c54ee59c4daf
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Feb 22 17:14:07 2023 +0800
Improve table records count calculation in pipeline job for MySQL (#24293)
* Improve table records count calculation in pipeline job
* Improve buildEstimateCountSQL
* Refactor getTableRecordsCount
* Fix ci
* Fix unit test
* Improve parameter name
---
.../spi/sqlbuilder/PipelineSQLBuilder.java | 9 +++++
.../core/prepare/InventoryTaskSplitter.java | 40 ++++++++++++++++++----
.../fixture/FixturePipelineSQLBuilder.java | 5 +++
.../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java | 6 ++++
.../sqlbuilder/MySQLPipelineSQLBuilderTest.java | 7 ++--
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 6 ++++
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 6 ++++
.../core/fixture/FixturePipelineSQLBuilder.java | 7 ++++
8 files changed, 77 insertions(+), 9 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index 9c042a8bd35..b7ebb6dad60 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -135,6 +135,15 @@ public interface PipelineSQLBuilder extends TypedSPI {
// TODO keep it for now, it might be used later
String buildCountSQL(String schemaName, String tableName);
+ /**
+ * Build estimated count SQL.
+ *
+ * @param schemaName schema name
+ * @param tableName table name
+ * @return estimated count sql
+ */
+ Optional<String> buildEstimatedCountSQL(String schemaName, String
tableName);
+
/**
* Build query all ordering SQL.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 090e131a527..84aec4aa43d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -54,6 +54,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
/**
* Inventory data task splitter.
@@ -171,21 +172,48 @@ public final class InventoryTaskSplitter {
PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
String schemaName = dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
- // TODO with a large amount of data, count the full table will have
performance problem
- String sql =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName, actualTableName);
+ PipelineSQLBuilder pipelineSQLBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
jobConfig.getSourceDatabaseType());
+ Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
+ try {
+ if (sql.isPresent()) {
+ long result = getEstimatedCount(dataSource, sql.get());
+ return result > 0 ? result : getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+ }
+ return getCount(dataSource,
pipelineSQLBuilder.buildCountSQL(schemaName, actualTableName));
+ } catch (final SQLException ex) {
+ String uniqueKey = dumperConfig.hasUniqueKey() ?
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
+ throw new
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(),
uniqueKey, ex);
+ }
+ }
+
+ // TODO maybe need refactor after PostgreSQL support estimated count.
+ private long getEstimatedCount(final DataSource dataSource, final String
estimatedCountSQL) throws SQLException {
try (
Connection connection = dataSource.getConnection();
- PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ PreparedStatement preparedStatement =
connection.prepareStatement(estimatedCountSQL)) {
+ preparedStatement.setString(1, connection.getCatalog());
try (ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return resultSet.getLong(1);
}
- } catch (final SQLException ex) {
- String uniqueKey = dumperConfig.hasUniqueKey() ?
dumperConfig.getUniqueKeyColumns().get(0).getName() : "";
- throw new
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(),
uniqueKey, ex);
}
}
+ private long getCount(final DataSource dataSource, final String countSQL)
throws SQLException {
+ long startTimeMillis = System.currentTimeMillis();
+ long result;
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement preparedStatement =
connection.prepareStatement(countSQL)) {
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ resultSet.next();
+ result = resultSet.getLong(1);
+ }
+ }
+ log.info("getCountSQLResult cost {} ms", System.currentTimeMillis() -
startTimeMillis);
+ return result;
+ }
+
private Collection<IngestPosition<?>>
getPositionByIntegerUniqueKeyRange(final InventoryIncrementalJobItemContext
jobItemContext, final DataSource dataSource,
final InventoryDumperConfiguration dumperConfig) {
Collection<IngestPosition<?>> result = new LinkedList<>();
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
index 136723e894b..523a43e6b96 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/fixture/FixturePipelineSQLBuilder.java
@@ -73,6 +73,11 @@ public final class FixturePipelineSQLBuilder implements
PipelineSQLBuilder {
return "";
}
+ @Override
+ public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
+ return Optional.empty();
+ }
+
@Override
public String buildQueryAllOrderingSQL(final String schemaName, final
String tableName, final String uniqueKey, final boolean firstQuery) {
return "";
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index 2007f458da1..195058b234f 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -89,6 +89,12 @@ public final class MySQLPipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
return Optional.of(String.format("SELECT BIT_XOR(CAST(CRC32(%s) AS
UNSIGNED)) AS checksum, COUNT(1) AS cnt FROM %s", quote(column),
quote(tableName)));
}
+ @Override
+ public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
+ return Optional.of(String.format("SELECT TABLE_ROWS FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = '%s'",
+ getQualifiedTableName(schemaName, tableName)));
+ }
+
@Override
public String getType() {
return "MySQL";
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
index 026a2302c4c..8b8af46df7b 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilderTest.java
@@ -72,8 +72,9 @@ public final class MySQLPipelineSQLBuilderTest {
}
@Test
- public void assertBuilderCountSQLWithoutKeyword() {
- String actualCountSQL = sqlBuilder.buildCountSQL(null, "t_order");
- assertThat(actualCountSQL, is("SELECT COUNT(*) FROM t_order"));
+ public void assertBuilderEstimateCountSQLWithoutKeyword() {
+ Optional<String> actual = sqlBuilder.buildEstimatedCountSQL(null,
"t_order");
+ assertTrue(actual.isPresent());
+ assertThat(actual.get(), is("SELECT TABLE_ROWS FROM
INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = 't_order'"));
}
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index ab1e1dea91b..332b89b3ea8 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -87,6 +87,12 @@ public final class OpenGaussPipelineSQLBuilder extends
AbstractPipelineSQLBuilde
return result.toString();
}
+ @Override
+ public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
+ // TODO Support estimated count later.
+ return Optional.empty();
+ }
+
@Override
public String getType() {
return "openGauss";
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 16681849cee..3f9db6be99d 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -85,6 +85,12 @@ public final class PostgreSQLPipelineSQLBuilder extends
AbstractPipelineSQLBuild
return result.toString();
}
+ @Override
+ public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
+ // TODO Support estimated count later.
+ return Optional.empty();
+ }
+
@Override
public String getType() {
return "PostgreSQL";
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
index 0e85ed5c65c..98f3ce69d59 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixturePipelineSQLBuilder.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.AbstractPipelineSQLBuilder;
+import java.util.Optional;
+
public final class FixturePipelineSQLBuilder extends
AbstractPipelineSQLBuilder {
@Override
@@ -40,4 +42,9 @@ public final class FixturePipelineSQLBuilder extends
AbstractPipelineSQLBuilder
protected String getRightIdentifierQuoteString() {
return "";
}
+
+ @Override
+ public Optional<String> buildEstimatedCountSQL(final String schemaName,
final String tableName) {
+ return Optional.empty();
+ }
}