This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 61444f8e08d Refactor MetaDataRefreshEngine.refreshFederation() (#34450)
61444f8e08d is described below

commit 61444f8e08d587e2b8435035b054df6414de02cf
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Jan 23 22:03:26 2025 +0800

    Refactor MetaDataRefreshEngine.refreshFederation() (#34450)
    
    * Move package of FederationMetaDataRefresher
    
    * Refactor MetaDataRefreshEngine
    
    * Refactor MetaDataRefreshEngine.refreshFederation()
    
    * Refactor MetaDataRefreshEngine.refreshFederation()
---
 .../executor/engine/DriverExecuteExecutor.java     | 13 ++---
 .../jdbc/DriverJDBCPushDownExecuteExecutor.java    |  8 +--
 .../DriverJDBCPushDownExecuteUpdateExecutor.java   | 12 ++--
 .../refresher/metadata/MetaDataRefreshEngine.java  | 66 +++++++++-------------
 .../connector/StandardDatabaseConnector.java       | 17 +++---
 5 files changed, 47 insertions(+), 69 deletions(-)

diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
index 7d7c20cbd36..4c780e46c27 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/DriverExecuteExecutor.java
@@ -101,23 +101,18 @@ public final class DriverExecuteExecutor {
                     new 
ExecuteQueryCallbackFactory(prepareEngine.getType()).newInstance(database, 
queryContext), new SQLFederationContext(false, queryContext, metaData, 
connection.getProcessId()));
             return null != resultSet;
         }
-        MetaDataRefreshEngine metaDataRefreshEngine = 
getMetaDataRefreshEngine(database);
-        if (sqlFederationEngine.enabled() && 
metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
-            
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
+        if (sqlFederationEngine.enabled()) {
+            new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps())
+                    .refreshFederation(queryContext.getSqlStatementContext());
             return true;
         }
         if (transactionExecutor.decide(queryContext)) {
             return transactionExecutor.execute((TCLStatement) 
queryContext.getSqlStatementContext().getSqlStatement());
         }
-        ExecutionContext executionContext =
-                new KernelProcessor().generateExecutionContext(queryContext, 
metaData.getGlobalRuleMetaData(), metaData.getProps());
+        ExecutionContext executionContext = new 
KernelProcessor().generateExecutionContext(queryContext, 
metaData.getGlobalRuleMetaData(), metaData.getProps());
         return executePushDown(database, executionContext, prepareEngine, 
executeCallback, addCallback, replayCallback);
     }
     
-    private MetaDataRefreshEngine getMetaDataRefreshEngine(final 
ShardingSphereDatabase database) {
-        return new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps());
-    }
-    
     @SuppressWarnings("rawtypes")
     private boolean executePushDown(final ShardingSphereDatabase database, 
final ExecutionContext executionContext, final 
DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine,
                                     final StatementExecuteCallback 
executeCallback, final StatementAddCallback addCallback, final 
StatementReplayCallback replayCallback) throws SQLException {
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
index 23581b492e2..171ef59380e 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteExecutor.java
@@ -104,13 +104,11 @@ public final class DriverJDBCPushDownExecuteExecutor {
             processEngine.executeSQL(executionGroupContext, 
executionContext.getQueryContext());
             List<Boolean> results = jdbcExecutor.execute(executionGroupContext,
                     new 
ExecuteCallbackFactory(prepareEngine.getType()).newInstance(database, 
executeCallback, executionContext.getSqlStatementContext().getSqlStatement()));
-            if 
(isNeedImplicitCommit(executionContext.getQueryContext().getSqlStatementContext()))
 {
+            if 
(isNeedImplicitCommit(executionContext.getSqlStatementContext())) {
                 connection.commit();
             }
-            if 
(MetaDataRefreshEngine.isRefreshMetaDataRequired(executionContext.getSqlStatementContext()))
 {
-                new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps())
-                        
.refresh(executionContext.getQueryContext().getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
-            }
+            new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, metaData.getProps())
+                    .refresh(executionContext.getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
             return null != results && !results.isEmpty() && null != 
results.get(0) && results.get(0);
         } finally {
             
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
diff --git 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
index cd87cca94ca..db4d9671ed3 100644
--- 
a/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
+++ 
b/jdbc/src/main/java/org/apache/shardingsphere/driver/executor/engine/pushdown/jdbc/DriverJDBCPushDownExecuteUpdateExecutor.java
@@ -106,16 +106,16 @@ public final class 
DriverJDBCPushDownExecuteUpdateExecutor {
         try {
             processEngine.executeSQL(executionGroupContext, 
executionContext.getQueryContext());
             JDBCExecutorCallback<Integer> callback = new 
ExecuteUpdateCallbackFactory(prepareEngine.getType())
-                    .newInstance(database, 
executionContext.getQueryContext().getSqlStatementContext().getSqlStatement(), 
updateCallback);
+                    .newInstance(database, 
executionContext.getSqlStatementContext().getSqlStatement(), updateCallback);
             List<Integer> updateCounts = 
jdbcExecutor.execute(executionGroupContext, callback);
-            if 
(MetaDataRefreshEngine.isRefreshMetaDataRequired(executionContext.getQueryContext().getSqlStatementContext()))
 {
-                if 
(isNeedImplicitCommit(executionContext.getQueryContext().getSqlStatementContext()))
 {
+            MetaDataRefreshEngine metaDataRefreshEngine = new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, props);
+            if 
(metaDataRefreshEngine.isNeedRefreshMetaData(executionContext.getSqlStatementContext()))
 {
+                if 
(isNeedImplicitCommit(executionContext.getSqlStatementContext())) {
                     connection.commit();
                 }
-                new 
MetaDataRefreshEngine(connection.getContextManager().getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, props)
-                        
.refresh(executionContext.getQueryContext().getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
+                
metaDataRefreshEngine.refresh(executionContext.getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
             }
-            return isNeedAccumulate(database.getRuleMetaData().getRules(), 
executionContext.getQueryContext().getSqlStatementContext()) ? 
accumulate(updateCounts) : updateCounts.get(0);
+            return isNeedAccumulate(database.getRuleMetaData().getRules(), 
executionContext.getSqlStatementContext()) ? accumulate(updateCounts) : 
updateCounts.get(0);
         } finally {
             
processEngine.completeSQLExecution(executionGroupContext.getReportContext().getProcessId());
         }
diff --git 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
index 4a75931c60c..30bde2fee73 100644
--- 
a/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/refresher/metadata/MetaDataRefreshEngine.java
@@ -67,6 +67,17 @@ public final class MetaDataRefreshEngine {
     
     private final ConfigurationProperties props;
     
+    /**
+     * Whether to need refresh meta data.
+     *
+     * @param sqlStatementContext SQL statement context
+     * @return is need refresh meta data or not
+     */
+    public boolean isNeedRefreshMetaData(final SQLStatementContext 
sqlStatementContext) {
+        Class<?> sqlStatementClass = 
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
+        return DDL_STATEMENT_CLASSES.contains(sqlStatementClass);
+    }
+    
     /**
      * Refresh meta data.
      *
@@ -76,29 +87,19 @@ public final class MetaDataRefreshEngine {
      */
     @SuppressWarnings({"unchecked", "rawtypes"})
     public void refresh(final SQLStatementContext sqlStatementContext, final 
Collection<RouteUnit> routeUnits) throws SQLException {
-        Class sqlStatementClass = 
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
-        if (!DDL_STATEMENT_CLASSES.contains(sqlStatementClass)) {
+        if (!isNeedRefreshMetaData(sqlStatementContext)) {
             return;
         }
-        Optional<MetaDataRefresher> schemaRefresher = 
TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass);
-        if (schemaRefresher.isPresent()) {
-            Collection<String> logicDataSourceNames = 
routeUnits.stream().map(each -> 
each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
-            String schemaName = sqlStatementContext instanceof TableAvailable 
? getSchemaName(sqlStatementContext) : null;
-            DatabaseType databaseType = routeUnits.stream().map(each -> 
database.getResourceMetaData().getStorageUnits().get(each.getDataSourceMapper().getActualName()))
-                    
.filter(Objects::nonNull).findFirst().map(StorageUnit::getStorageType).orElseGet(sqlStatementContext::getDatabaseType);
-            schemaRefresher.get().refresh(metaDataManagerPersistService, 
database, logicDataSourceNames, schemaName, databaseType, 
sqlStatementContext.getSqlStatement(), props);
+        Class<?> sqlStatementClass = 
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
+        Optional<MetaDataRefresher> metaDataRefresher = 
TypedSPILoader.findService(MetaDataRefresher.class, sqlStatementClass);
+        if (!metaDataRefresher.isPresent()) {
+            return;
         }
-    }
-    
-    /**
-     * Refresh meta data for federation.
-     *
-     * @param sqlStatementContext SQL statement context
-     */
-    @SuppressWarnings("unchecked")
-    public void refresh(final SQLStatementContext sqlStatementContext) {
-        getFederationMetaDataRefresher(sqlStatementContext).ifPresent(
-                optional -> optional.refresh(metaDataManagerPersistService, 
database, getSchemaName(sqlStatementContext), 
sqlStatementContext.getSqlStatement()));
+        Collection<String> logicDataSourceNames = routeUnits.stream().map(each 
-> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
+        String schemaName = sqlStatementContext instanceof TableAvailable ? 
getSchemaName(sqlStatementContext) : null;
+        DatabaseType databaseType = routeUnits.stream().map(each -> 
database.getResourceMetaData().getStorageUnits().get(each.getDataSourceMapper().getActualName()))
+                
.filter(Objects::nonNull).findFirst().map(StorageUnit::getStorageType).orElseGet(sqlStatementContext::getDatabaseType);
+        metaDataRefresher.get().refresh(metaDataManagerPersistService, 
database, logicDataSourceNames, schemaName, databaseType, 
sqlStatementContext.getSqlStatement(), props);
     }
     
     private String getSchemaName(final SQLStatementContext 
sqlStatementContext) {
@@ -107,27 +108,14 @@ public final class MetaDataRefreshEngine {
     }
     
     /**
-     * SQL statement is federation or not.
-     *
-     * @param sqlStatementContext SQL statement context
-     * @return is federation or not
-     */
-    public boolean isFederation(final SQLStatementContext sqlStatementContext) 
{
-        return getFederationMetaDataRefresher(sqlStatementContext).isPresent();
-    }
-    
-    @SuppressWarnings("rawtypes")
-    private Optional<FederationMetaDataRefresher> 
getFederationMetaDataRefresher(final SQLStatementContext sqlStatementContext) {
-        return TypedSPILoader.findService(FederationMetaDataRefresher.class, 
sqlStatementContext.getSqlStatement().getClass().getSuperclass());
-    }
-    
-    /**
-     * Is refresh meta data required.
+     * Refresh meta data for federation.
      *
      * @param sqlStatementContext SQL statement context
-     * @return is refresh meta data required or not
      */
-    public static boolean isRefreshMetaDataRequired(final SQLStatementContext 
sqlStatementContext) {
-        return 
DDL_STATEMENT_CLASSES.contains(sqlStatementContext.getSqlStatement().getClass().getSuperclass());
+    @SuppressWarnings("unchecked")
+    public void refreshFederation(final SQLStatementContext 
sqlStatementContext) {
+        Class<?> sqlStatementClass = 
sqlStatementContext.getSqlStatement().getClass().getSuperclass();
+        TypedSPILoader.findService(FederationMetaDataRefresher.class, 
sqlStatementClass).ifPresent(
+                optional -> optional.refresh(metaDataManagerPersistService, 
database, getSchemaName(sqlStatementContext), 
sqlStatementContext.getSqlStatement()));
     }
 }
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
index 1fa6068d1c5..08fa0840757 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/StandardDatabaseConnector.java
@@ -110,6 +110,8 @@ public final class StandardDatabaseConnector implements 
DatabaseConnector {
     
     private final ProxySQLExecutor proxySQLExecutor;
     
+    private final MetaDataRefreshEngine metaDataRefreshEngine;
+    
     private final Collection<Statement> cachedStatements = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
     
     private final Collection<ResultSet> cachedResultSets = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -131,6 +133,8 @@ public final class StandardDatabaseConnector implements 
DatabaseConnector {
             prepareCursorStatementContext((CursorAvailable) 
sqlStatementContext);
         }
         proxySQLExecutor = new ProxySQLExecutor(driverType, 
databaseConnectionManager, this, queryContext);
+        metaDataRefreshEngine = new MetaDataRefreshEngine(
+                
contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(), 
database, contextManager.getMetaDataContexts().getMetaData().getProps());
     }
     
     private void checkBackendReady(final SQLStatementContext 
sqlStatementContext) {
@@ -179,9 +183,8 @@ public final class StandardDatabaseConnector implements 
DatabaseConnector {
         if (proxySQLExecutor.getSqlFederationEngine().decide(queryContext, 
contextManager.getMetaDataContexts().getMetaData().getGlobalRuleMetaData())) {
             return processExecuteFederation(doExecuteFederation());
         }
-        MetaDataRefreshEngine metaDataRefreshEngine = 
getMetaDataRefreshEngine();
-        if (proxySQLExecutor.getSqlFederationEngine().enabled() && 
metaDataRefreshEngine.isFederation(queryContext.getSqlStatementContext())) {
-            
metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext());
+        if (proxySQLExecutor.getSqlFederationEngine().enabled()) {
+            
metaDataRefreshEngine.refreshFederation(queryContext.getSqlStatementContext());
             return new 
UpdateResponseHeader(queryContext.getSqlStatementContext().getSqlStatement());
         }
         ExecutionContext executionContext = generateExecutionContext();
@@ -233,9 +236,7 @@ public final class StandardDatabaseConnector implements 
DatabaseConnector {
         List<ExecuteResult> executeResults = advancedExecutors.isEmpty()
                 ? proxySQLExecutor.execute(executionContext)
                 : 
advancedExecutors.iterator().next().execute(executionContext, contextManager, 
database, this);
-        if 
(MetaDataRefreshEngine.isRefreshMetaDataRequired(queryContext.getSqlStatementContext()))
 {
-            
getMetaDataRefreshEngine().refresh(queryContext.getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
-        }
+        metaDataRefreshEngine.refresh(queryContext.getSqlStatementContext(), 
executionContext.getRouteContext().getRouteUnits());
         Object executeResultSample = executeResults.iterator().next();
         return executeResultSample instanceof QueryResult
                 ? processExecuteQuery(queryContext.getSqlStatementContext(), 
executeResults.stream().map(QueryResult.class::cast).collect(Collectors.toList()),
 (QueryResult) executeResultSample)
@@ -271,10 +272,6 @@ public final class StandardDatabaseConnector implements 
DatabaseConnector {
         return new QueryResponseHeader(queryHeaders);
     }
     
-    private MetaDataRefreshEngine getMetaDataRefreshEngine() {
-        return new 
MetaDataRefreshEngine(contextManager.getPersistServiceFacade().getMetaDataManagerPersistService(),
 database, contextManager.getMetaDataContexts().getMetaData().getProps());
-    }
-    
     private QueryResponseHeader processExecuteQuery(final SQLStatementContext 
sqlStatementContext, final List<QueryResult> queryResults, final QueryResult 
queryResultSample) throws SQLException {
         queryHeaders = createQueryHeaders(sqlStatementContext, 
queryResultSample);
         mergedResult = mergeQuery(sqlStatementContext, queryResults);

Reply via email to