This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a87def  Realize the lock confirmation process before executing ddl 
(#8509)
4a87def is described below

commit 4a87def4c3fe665c7a6017e8a0c721905a9d8ecb
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Dec 7 13:03:28 2020 +0800

    Realize the lock confirmation process before executing ddl (#8509)
---
 .../core/lock/GovernanceLockStrategy.java          | 41 ++++++++++++++++++++++
 .../governance/core/registry/RegistryCenter.java   | 21 +++++++++++
 .../core/registry/RegistryCenterNode.java          |  9 +++++
 .../shardingsphere/infra/lock/LockStrategy.java    |  7 ++++
 .../infra/lock/StandardLockStrategy.java           |  5 +++
 .../communication/DatabaseCommunicationEngine.java | 19 ++++++++--
 6 files changed, 99 insertions(+), 3 deletions(-)

diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
index fa570e2..62acd86 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
@@ -18,13 +18,20 @@
 package org.apache.shardingsphere.governance.core.lock;
 
 import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
+import 
org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
 import org.apache.shardingsphere.infra.lock.LockStrategy;
 
+import java.util.Collection;
+
 /**
  * Governance lock strategy.
  */
 public final class GovernanceLockStrategy implements LockStrategy {
     
+    private static final int CHECK_RETRY_MAXIMUM = 5;
+    
+    private static final int CHECK_RETRY_INTERVAL_SECONDS = 3;
+    
     private final GovernanceFacade governanceFacade;
     
     public GovernanceLockStrategy(final GovernanceFacade governanceFacade) {
@@ -40,4 +47,38 @@ public final class GovernanceLockStrategy implements 
LockStrategy {
     public void releaseLock() {
         governanceFacade.getLockCenter().releaseGlobalLock();
     }
+    
+    @Override
+    public boolean checkLock() {
+        Collection<String> instanceIds = 
governanceFacade.getRegistryCenter().loadAllInstances();
+        if (instanceIds.isEmpty()) {
+            return true;
+        }
+        return checkOrRetry(instanceIds);
+    }
+    
+    private boolean checkOrRetry(final Collection<String> instanceIds) {
+        for (int i = 0; i < CHECK_RETRY_MAXIMUM; i++) {
+            if (check(instanceIds)) {
+                return true;
+            }
+            try {
+                Thread.sleep(CHECK_RETRY_INTERVAL_SECONDS * 1000L);
+                // CHECKSTYLE:OFF
+            } catch (final InterruptedException ex) {
+                // CHECKSTYLE:ON
+            }
+        }
+        return false;
+    }
+    
+    private boolean check(final Collection<String> instanceIds) {
+        for (String instanceId : instanceIds) {
+            if (!RegistryCenterNodeStatus.LOCKED.toString()
+                    
.equalsIgnoreCase(governanceFacade.getRegistryCenter().loadInstanceData(instanceId)))
 {
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 7565430..a1fa1e2 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -59,6 +59,7 @@ public final class RegistryCenter {
     
     /**
      * Persist instance data.
+     * 
      * @param instanceData instance data
      */
     public void persistInstanceData(final String instanceData) {
@@ -67,6 +68,7 @@ public final class RegistryCenter {
     
     /**
      * Load instance data.
+     * 
      * @return instance data
      */
     public String loadInstanceData() {
@@ -74,6 +76,25 @@ public final class RegistryCenter {
     }
     
     /**
+     * Load instance data.
+     * 
+     * @param instanceId instance id
+     * @return instance data
+     */
+    public String loadInstanceData(final String instanceId) {
+        return repository.get(node.getProxyNodePath(instanceId));
+    }
+    
+    /**
+     * Load all instances.
+     * 
+     * @return collection of all instances
+     */
+    public Collection<String> loadAllInstances() {
+        return repository.getChildrenKeys(node.getProxyNodesPath());
+    }
+    
+    /**
      * Load disabled data sources.
      * 
      * @param schemaName schema name
diff --git 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
index e06b782..48a4850 100644
--- 
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
+++ 
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
@@ -104,4 +104,13 @@ public final class RegistryCenterNode {
         }
         return result;
     }
+    
+    /**
+     * Get proxy nodes path.
+     *
+     * @return proxy nodes path
+     */
+    public String getProxyNodesPath() {
+        return Joiner.on("/").join("", ROOT, PROXY_NODES_NAME);
+    }
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
index 2f67c31..2a5c389 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
@@ -33,4 +33,11 @@ public interface LockStrategy {
      * Release lock.
      */
     void releaseLock();
+    
+    /**
+     * Check lock state.
+     * 
+     * @return true if all instances were locked, else false
+     */
+    boolean checkLock();
 }
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
index a4d4fc8..a8d18ab 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
@@ -44,4 +44,9 @@ public final class StandardLockStrategy implements 
LockStrategy {
         lock.unlock();
         StateContext.switchState(new StateEvent(StateType.OK, true));
     }
+    
+    @Override
+    public boolean checkLock() {
+        return StateContext.getCurrentState() == StateType.LOCK;
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index f21fd20..eb33e07 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -111,9 +111,7 @@ public final class DatabaseCommunicationEngine {
         if (executionContext.getExecutionUnits().isEmpty()) {
             return new 
UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
         }
-        if (needLock(executionContext) && 
!LockContext.getLockStrategy().tryLock()) {
-            throw new LockWaitTimeoutException();
-        }
+        lockForDDL(executionContext);
         proxySQLExecutor.checkExecutePrerequisites(executionContext);
         Collection<ExecuteResult> executeResults = 
proxySQLExecutor.execute(executionContext);
         ExecuteResult executeResultSample = executeResults.iterator().next();
@@ -122,6 +120,15 @@ public final class DatabaseCommunicationEngine {
                 : processExecuteUpdate(executionContext, 
executeResults.stream().map(each -> (UpdateResult) 
each).collect(Collectors.toList()));
     }
     
+    private void lockForDDL(final ExecutionContext executionContext) {
+        if (needLock(executionContext)) {
+            if (!LockContext.getLockStrategy().tryLock()) {
+                throw new LockWaitTimeoutException();
+            }
+            checkLock();
+        }
+    }
+    
     private boolean needLock(final ExecutionContext executionContext) {
         SQLStatement sqlStatement = 
executionContext.getSqlStatementContext().getSqlStatement();
         if (null == sqlStatement) {
@@ -130,6 +137,12 @@ public final class DatabaseCommunicationEngine {
         return SchemaRefresherFactory.newInstance(sqlStatement).isPresent();
     }
     
+    private void checkLock() {
+        if (!LockContext.getLockStrategy().checkLock()) {
+            throw new LockWaitTimeoutException();
+        }
+    }
+    
     private QueryResponseHeader processExecuteQuery(final ExecutionContext 
executionContext, final List<QueryResult> queryResults, final QueryResult 
queryResultSample) throws SQLException {
         queryHeaders = createQueryHeaders(executionContext, queryResultSample);
         mergedResult = mergeQuery(executionContext.getSqlStatementContext(), 
queryResults);

Reply via email to