This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4a87def Realize the lock confirmation process before executing ddl
(#8509)
4a87def is described below
commit 4a87def4c3fe665c7a6017e8a0c721905a9d8ecb
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Dec 7 13:03:28 2020 +0800
Realize the lock confirmation process before executing ddl (#8509)
---
.../core/lock/GovernanceLockStrategy.java | 41 ++++++++++++++++++++++
.../governance/core/registry/RegistryCenter.java | 21 +++++++++++
.../core/registry/RegistryCenterNode.java | 9 +++++
.../shardingsphere/infra/lock/LockStrategy.java | 7 ++++
.../infra/lock/StandardLockStrategy.java | 5 +++
.../communication/DatabaseCommunicationEngine.java | 19 ++++++++--
6 files changed, 99 insertions(+), 3 deletions(-)
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
index fa570e2..62acd86 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/lock/GovernanceLockStrategy.java
@@ -18,13 +18,20 @@
package org.apache.shardingsphere.governance.core.lock;
import org.apache.shardingsphere.governance.core.facade.GovernanceFacade;
+import
org.apache.shardingsphere.governance.core.registry.RegistryCenterNodeStatus;
import org.apache.shardingsphere.infra.lock.LockStrategy;
+import java.util.Collection;
+
/**
* Governance lock strategy.
*/
public final class GovernanceLockStrategy implements LockStrategy {
+ private static final int CHECK_RETRY_MAXIMUM = 5;
+
+ private static final int CHECK_RETRY_INTERVAL_SECONDS = 3;
+
private final GovernanceFacade governanceFacade;
public GovernanceLockStrategy(final GovernanceFacade governanceFacade) {
@@ -40,4 +47,38 @@ public final class GovernanceLockStrategy implements
LockStrategy {
public void releaseLock() {
governanceFacade.getLockCenter().releaseGlobalLock();
}
+
+ @Override
+ public boolean checkLock() {
+ Collection<String> instanceIds =
governanceFacade.getRegistryCenter().loadAllInstances();
+ if (instanceIds.isEmpty()) {
+ return true;
+ }
+ return checkOrRetry(instanceIds);
+ }
+
+ private boolean checkOrRetry(final Collection<String> instanceIds) {
+ for (int i = 0; i < CHECK_RETRY_MAXIMUM; i++) {
+ if (check(instanceIds)) {
+ return true;
+ }
+ try {
+ Thread.sleep(CHECK_RETRY_INTERVAL_SECONDS * 1000L);
+ // CHECKSTYLE:OFF
+ } catch (final InterruptedException ex) {
+ // CHECKSTYLE:ON
+ }
+ }
+ return false;
+ }
+
+ private boolean check(final Collection<String> instanceIds) {
+ for (String instanceId : instanceIds) {
+ if (!RegistryCenterNodeStatus.LOCKED.toString()
+
.equalsIgnoreCase(governanceFacade.getRegistryCenter().loadInstanceData(instanceId)))
{
+ return false;
+ }
+ }
+ return true;
+ }
}
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
index 7565430..a1fa1e2 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenter.java
@@ -59,6 +59,7 @@ public final class RegistryCenter {
/**
* Persist instance data.
+ *
* @param instanceData instance data
*/
public void persistInstanceData(final String instanceData) {
@@ -67,6 +68,7 @@ public final class RegistryCenter {
/**
* Load instance data.
+ *
* @return instance data
*/
public String loadInstanceData() {
@@ -74,6 +76,25 @@ public final class RegistryCenter {
}
/**
+ * Load instance data.
+ *
+ * @param instanceId instance id
+ * @return instance data
+ */
+ public String loadInstanceData(final String instanceId) {
+ return repository.get(node.getProxyNodePath(instanceId));
+ }
+
+ /**
+ * Load all instances.
+ *
+ * @return collection of all instances
+ */
+ public Collection<String> loadAllInstances() {
+ return repository.getChildrenKeys(node.getProxyNodesPath());
+ }
+
+ /**
* Load disabled data sources.
*
* @param schemaName schema name
diff --git
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
index e06b782..48a4850 100644
---
a/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
+++
b/shardingsphere-governance/shardingsphere-governance-core/src/main/java/org/apache/shardingsphere/governance/core/registry/RegistryCenterNode.java
@@ -104,4 +104,13 @@ public final class RegistryCenterNode {
}
return result;
}
+
+ /**
+ * Get proxy nodes path.
+ *
+ * @return proxy nodes path
+ */
+ public String getProxyNodesPath() {
+ return Joiner.on("/").join("", ROOT, PROXY_NODES_NAME);
+ }
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
index 2f67c31..2a5c389 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/LockStrategy.java
@@ -33,4 +33,11 @@ public interface LockStrategy {
* Release lock.
*/
void releaseLock();
+
+ /**
+ * Check lock state.
+ *
+ * @return true if all instances were locked, else false
+ */
+ boolean checkLock();
}
diff --git
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
index a4d4fc8..a8d18ab 100644
---
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
+++
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/lock/StandardLockStrategy.java
@@ -44,4 +44,9 @@ public final class StandardLockStrategy implements
LockStrategy {
lock.unlock();
StateContext.switchState(new StateEvent(StateType.OK, true));
}
+
+ @Override
+ public boolean checkLock() {
+ return StateContext.getCurrentState() == StateType.LOCK;
+ }
}
diff --git
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
index f21fd20..eb33e07 100644
---
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
+++
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/DatabaseCommunicationEngine.java
@@ -111,9 +111,7 @@ public final class DatabaseCommunicationEngine {
if (executionContext.getExecutionUnits().isEmpty()) {
return new
UpdateResponseHeader(executionContext.getSqlStatementContext().getSqlStatement());
}
- if (needLock(executionContext) &&
!LockContext.getLockStrategy().tryLock()) {
- throw new LockWaitTimeoutException();
- }
+ lockForDDL(executionContext);
proxySQLExecutor.checkExecutePrerequisites(executionContext);
Collection<ExecuteResult> executeResults =
proxySQLExecutor.execute(executionContext);
ExecuteResult executeResultSample = executeResults.iterator().next();
@@ -122,6 +120,15 @@ public final class DatabaseCommunicationEngine {
: processExecuteUpdate(executionContext,
executeResults.stream().map(each -> (UpdateResult)
each).collect(Collectors.toList()));
}
+ private void lockForDDL(final ExecutionContext executionContext) {
+ if (needLock(executionContext)) {
+ if (!LockContext.getLockStrategy().tryLock()) {
+ throw new LockWaitTimeoutException();
+ }
+ checkLock();
+ }
+ }
+
private boolean needLock(final ExecutionContext executionContext) {
SQLStatement sqlStatement =
executionContext.getSqlStatementContext().getSqlStatement();
if (null == sqlStatement) {
@@ -130,6 +137,12 @@ public final class DatabaseCommunicationEngine {
return SchemaRefresherFactory.newInstance(sqlStatement).isPresent();
}
+ private void checkLock() {
+ if (!LockContext.getLockStrategy().checkLock()) {
+ throw new LockWaitTimeoutException();
+ }
+ }
+
private QueryResponseHeader processExecuteQuery(final ExecutionContext
executionContext, final List<QueryResult> queryResults, final QueryResult
queryResultSample) throws SQLException {
queryHeaders = createQueryHeaders(executionContext, queryResultSample);
mergedResult = mergeQuery(executionContext.getSqlStatementContext(),
queryResults);