This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5df6349a71b Optimize ShardingStatisticsTableCollector to support
aggregated datasource (#34083)
5df6349a71b is described below
commit 5df6349a71b9dcd460100e62d6b6544215fb2a7a
Author: jiangML <[email protected]>
AuthorDate: Tue Dec 17 11:43:53 2024 +0800
Optimize ShardingStatisticsTableCollector to support aggregated datasource
(#34083)
---
.../data/ShardingStatisticsTableCollector.java | 28 ++++++++++++++++++----
1 file changed, 23 insertions(+), 5 deletions(-)
diff --git
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
index e0735b75a0c..2165b336f63 100644
---
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
+++
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/metadata/data/ShardingStatisticsTableCollector.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.sharding.metadata.data;
+import org.apache.shardingsphere.infra.database.DatabaseTypeEngine;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -29,10 +30,12 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSp
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereRowData;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereTableData;
import
org.apache.shardingsphere.infra.metadata.statistics.collector.ShardingSphereStatisticsCollector;
+import
org.apache.shardingsphere.infra.rule.attribute.datasource.aggregate.AggregatedDataSourceRuleAttribute;
import
org.apache.shardingsphere.sharding.metadata.data.dialect.DialectShardingStatisticsTableCollector;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.ShardingTable;
+import javax.sql.DataSource;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
@@ -81,19 +84,34 @@ public final class ShardingStatisticsTableCollector
implements ShardingSphereSta
row.add(each.getLogicTable());
row.add(dataNode.getDataSourceName());
row.add(dataNode.getTableName());
-
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(),
dataNode, row);
+
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(),
dataNode, row, rule);
tableData.getRows().add(new ShardingSphereRowData(row));
}
}
}
- private void addTableRowsAndDataLength(final Map<String, StorageUnit>
storageUnits, final DataNode dataNode, final List<Object> row) throws
SQLException {
+ private void addTableRowsAndDataLength(final Map<String, StorageUnit>
storageUnits, final DataNode dataNode, final List<Object> row, final
ShardingRule rule) throws SQLException {
+ DataSource dataSource;
+ DatabaseType databaseType;
StorageUnit storageUnit =
storageUnits.get(dataNode.getDataSourceName());
- DatabaseType databaseType = storageUnit.getStorageType();
- Optional<DialectShardingStatisticsTableCollector> dialectCollector =
DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class,
databaseType);
+ if (null != storageUnit) {
+ dataSource = storageUnit.getDataSource();
+ databaseType = storageUnit.getStorageType();
+ } else {
+ Optional<AggregatedDataSourceRuleAttribute>
aggregatedDataSourceRuleAttribute =
rule.getAttributes().findAttribute(AggregatedDataSourceRuleAttribute.class);
+ dataSource = aggregatedDataSourceRuleAttribute.map(optional ->
optional.getAggregatedDataSources().get(dataNode.getDataSourceName())).orElse(null);
+ databaseType = null != dataSource ?
DatabaseTypeEngine.getStorageType(dataSource) : null;
+ }
+ if (null != dataSource && null != databaseType) {
+ addTableRowsAndDataLength(databaseType, dataSource, dataNode, row);
+ }
+ }
+
+ private void addTableRowsAndDataLength(final DatabaseType databaseType,
final DataSource dataSource, final DataNode dataNode, final List<Object> row)
throws SQLException {
boolean isAppended = false;
+ Optional<DialectShardingStatisticsTableCollector> dialectCollector =
DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class,
databaseType);
if (dialectCollector.isPresent()) {
- try (Connection connection =
storageUnit.getDataSource().getConnection()) {
+ try (Connection connection = dataSource.getConnection()) {
isAppended = dialectCollector.get().appendRow(connection,
dataNode, row);
}
}