This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c1808284b2c Refactor StatisticsRefreshEngine (#34445)
c1808284b2c is described below

commit c1808284b2cc38f710e836d8acae6bb2d20d879c
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jan 23 21:19:37 2025 +0800

    Refactor StatisticsRefreshEngine (#34445)
    
    * Refactor StatisticsRefreshEngine
    
    * Refactor StatisticsRefreshEngine
---
 .../statistics/StatisticsRefreshEngine.java        | 61 +++++++++++-----------
 1 file changed, 31 insertions(+), 30 deletions(-)

diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
index 0fa584b9104..273f5edd33e 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/statistics/StatisticsRefreshEngine.java
@@ -84,7 +84,7 @@ public final class StatisticsRefreshEngine {
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
-            log.error("Collect data error", ex);
+            log.error("Collect statistics error.", ex);
         }
     }
     
@@ -92,15 +92,15 @@ public final class StatisticsRefreshEngine {
         GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new 
StatisticsLock());
         if (lockContext.tryLock(lockDefinition, 5000L)) {
             try {
-                ShardingSphereStatistics statistics = 
contextManager.getMetaDataContexts().getStatistics();
+                ShardingSphereStatistics currentStatistics = 
contextManager.getMetaDataContexts().getStatistics();
                 ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
                 ShardingSphereStatistics changedStatistics = new 
ShardingSphereStatistics();
-                for (Entry<String, DatabaseStatistics> entry : 
statistics.getDatabaseStatisticsMap().entrySet()) {
+                for (Entry<String, DatabaseStatistics> entry : 
currentStatistics.getDatabaseStatisticsMap().entrySet()) {
                     if (metaData.containsDatabase(entry.getKey())) {
                         collectForDatabase(entry.getKey(), entry.getValue(), 
metaData, changedStatistics);
                     }
                 }
-                compareAndUpdate(changedStatistics);
+                compareAndUpdate(currentStatistics, changedStatistics, 
metaData);
             } finally {
                 lockContext.unlock(lockDefinition);
             }
@@ -126,16 +126,19 @@ public final class StatisticsRefreshEngine {
     
     private void collectForTable(final String databaseName, final String 
schemaName, final ShardingSphereTable table,
                                  final ShardingSphereMetaData metaData, final 
ShardingSphereStatistics statistics) {
-        Optional<TableStatisticsCollector> statisticsCollector = 
TypedSPILoader.findService(TableStatisticsCollector.class, table.getName());
-        Optional<TableStatistics> tableStatistics = Optional.empty();
-        if (statisticsCollector.isPresent()) {
+        Optional<TableStatisticsCollector> tableStatisticsCollector = 
TypedSPILoader.findService(TableStatisticsCollector.class, table.getName());
+        Optional<TableStatistics> tableStatistics;
+        if (tableStatisticsCollector.isPresent()) {
             try {
-                tableStatistics = 
statisticsCollector.get().collect(databaseName, table, metaData);
+                tableStatistics = 
tableStatisticsCollector.get().collect(databaseName, table, metaData);
                 // CHECKSTYLE:OFF
             } catch (final Exception ex) {
                 // CHECKSTYLE:ON
-                log.error(String.format("Collect %s.%s.%s data failed", 
databaseName, schemaName, table.getName()), ex);
+                log.error("Collect {}.{}.{} statistics failed.", databaseName, 
schemaName, table.getName(), ex);
+                tableStatistics = Optional.empty();
             }
+        } else {
+            tableStatistics = Optional.empty();
         }
         DatabaseStatistics databaseStatistics = 
statistics.containsDatabaseStatistics(databaseName) ? 
statistics.getDatabaseStatistics(databaseName) : new DatabaseStatistics();
         SchemaStatistics schemaStatistics = 
databaseStatistics.containsSchemaStatistics(schemaName) ? 
databaseStatistics.getSchemaStatistics(schemaName) : new SchemaStatistics();
@@ -144,47 +147,45 @@ public final class StatisticsRefreshEngine {
         statistics.putDatabaseStatistics(databaseName, databaseStatistics);
     }
     
-    private void compareAndUpdate(final ShardingSphereStatistics 
changedStatistics) {
-        ShardingSphereMetaData metaData = 
contextManager.getMetaDataContexts().getMetaData();
-        ShardingSphereStatistics statistics = 
contextManager.getMetaDataContexts().getStatistics();
+    private void compareAndUpdate(final ShardingSphereStatistics 
currentStatistics, final ShardingSphereStatistics changedStatistics, final 
ShardingSphereMetaData metaData) {
         for (Entry<String, DatabaseStatistics> entry : 
changedStatistics.getDatabaseStatisticsMap().entrySet()) {
-            compareAndUpdateForDatabase(entry.getKey(), 
statistics.getDatabaseStatistics(entry.getKey()), entry.getValue(), statistics, 
metaData.getDatabase(entry.getKey()));
+            compareAndUpdateForDatabase(metaData.getDatabase(entry.getKey()), 
currentStatistics, currentStatistics.getDatabaseStatistics(entry.getKey()), 
entry.getValue());
         }
-        for (Entry<String, DatabaseStatistics> entry : 
statistics.getDatabaseStatisticsMap().entrySet()) {
+        for (Entry<String, DatabaseStatistics> entry : 
currentStatistics.getDatabaseStatisticsMap().entrySet()) {
             if (!changedStatistics.containsDatabaseStatistics(entry.getKey())) 
{
-                statistics.dropDatabaseStatistics(entry.getKey());
+                currentStatistics.dropDatabaseStatistics(entry.getKey());
                 
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereStatisticsPersistService().delete(entry.getKey());
             }
         }
     }
     
-    private void compareAndUpdateForDatabase(final String databaseName, final 
DatabaseStatistics databaseStatistics, final DatabaseStatistics 
changedDatabaseStatistics,
-                                             final ShardingSphereStatistics 
statistics, final ShardingSphereDatabase database) {
+    private void compareAndUpdateForDatabase(final ShardingSphereDatabase 
database, final ShardingSphereStatistics currentStatistics,
+                                             final DatabaseStatistics 
currentDatabaseStatistics, final DatabaseStatistics changedDatabaseStatistics) {
         for (Entry<String, SchemaStatistics> entry : 
changedDatabaseStatistics.getSchemaStatisticsMap().entrySet()) {
-            compareAndUpdateForSchema(databaseName, entry.getKey(), 
databaseStatistics.getSchemaStatistics(entry.getKey()), entry.getValue(), 
statistics, database.getSchema(entry.getKey()));
+            compareAndUpdateForSchema(database.getName(), 
database.getSchema(entry.getKey()), currentStatistics, 
currentDatabaseStatistics.getSchemaStatistics(entry.getKey()), 
entry.getValue());
         }
     }
     
-    private void compareAndUpdateForSchema(final String databaseName, final 
String schemaName, final SchemaStatistics schemaStatistics,
-                                           final SchemaStatistics 
changedSchemaStatistics, final ShardingSphereStatistics statistics, final 
ShardingSphereSchema schema) {
+    private void compareAndUpdateForSchema(final String databaseName, final 
ShardingSphereSchema schema, final ShardingSphereStatistics currentStatistics,
+                                           final SchemaStatistics 
currentSchemaStatistics, final SchemaStatistics changedSchemaStatistics) {
         for (Entry<String, TableStatistics> entry : 
changedSchemaStatistics.getTableStatisticsMap().entrySet()) {
-            compareAndUpdateForTable(databaseName, schemaName, 
schemaStatistics.getTableStatistics(entry.getKey()), entry.getValue(), 
statistics, schema.getTable(entry.getKey()));
+            compareAndUpdateForTable(databaseName, schema.getName(), 
schema.getTable(entry.getKey()), currentStatistics, 
currentSchemaStatistics.getTableStatistics(entry.getKey()), entry.getValue());
         }
     }
     
-    private void compareAndUpdateForTable(final String databaseName, final 
String schemaName, final TableStatistics tableStatistics,
-                                          final TableStatistics 
changedTableStatistics, final ShardingSphereStatistics statistics, final 
ShardingSphereTable table) {
-        if (!tableStatistics.equals(changedTableStatistics)) {
-            
statistics.getDatabaseStatistics(databaseName).getSchemaStatistics(schemaName).putTableStatistics(changedTableStatistics.getName(),
 changedTableStatistics);
-            AlteredDatabaseStatistics alteredDatabaseStatistics = 
createAlteredDatabaseStatistics(databaseName, schemaName, tableStatistics, 
changedTableStatistics, table);
+    private void compareAndUpdateForTable(final String databaseName, final 
String schemaName, final ShardingSphereTable table,
+                                          final ShardingSphereStatistics 
currentStatistics, final TableStatistics currentTableStatistics, final 
TableStatistics changedTableStatistics) {
+        if (!currentTableStatistics.equals(changedTableStatistics)) {
+            
currentStatistics.getDatabaseStatistics(databaseName).getSchemaStatistics(schemaName).putTableStatistics(changedTableStatistics.getName(),
 changedTableStatistics);
+            AlteredDatabaseStatistics alteredDatabaseStatistics = 
createAlteredDatabaseStatistics(databaseName, schemaName, table, 
currentTableStatistics, changedTableStatistics);
             
contextManager.getPersistServiceFacade().getMetaDataPersistService().getShardingSphereStatisticsPersistService().update(alteredDatabaseStatistics);
         }
     }
     
-    private AlteredDatabaseStatistics createAlteredDatabaseStatistics(final 
String databaseName, final String schemaName, final TableStatistics 
tableStatistics,
-                                                                      final 
TableStatistics changedTableStatistics, final ShardingSphereTable table) {
-        AlteredDatabaseStatistics result = new 
AlteredDatabaseStatistics(databaseName, schemaName, tableStatistics.getName());
-        Map<String, RowStatistics> tableStatisticsMap = 
tableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
 Function.identity()));
+    private AlteredDatabaseStatistics createAlteredDatabaseStatistics(final 
String databaseName, final String schemaName, final ShardingSphereTable table,
+                                                                      final 
TableStatistics currentTableStatistics, final TableStatistics 
changedTableStatistics) {
+        AlteredDatabaseStatistics result = new 
AlteredDatabaseStatistics(databaseName, schemaName, 
currentTableStatistics.getName());
+        Map<String, RowStatistics> tableStatisticsMap = 
currentTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
 Function.identity()));
         Map<String, RowStatistics> changedTableStatisticsMap = 
changedTableStatistics.getRows().stream().collect(Collectors.toMap(RowStatistics::getUniqueKey,
 Function.identity()));
         YamlRowStatisticsSwapper swapper = new YamlRowStatisticsSwapper(new 
ArrayList<>(table.getAllColumns()));
         for (Entry<String, RowStatistics> entry : 
changedTableStatisticsMap.entrySet()) {

Reply via email to