This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8a167179a08 Add ExclusiveOperation instead of GlobalLock (#36601)
8a167179a08 is described below
commit 8a167179a089f0f9dc80c7d84f0a452014c8dd06
Author: Haoran Meng <[email protected]>
AuthorDate: Wed Sep 17 15:55:56 2025 +0800
Add ExclusiveOperation instead of GlobalLock (#36601)
---
.../PrometheusPluginLifecycleServiceTest.java | 4 +-
.../migration/preparer/MigrationJobPreparer.java | 30 +++++---------
...areLock.java => MigrationPrepareOperation.java} | 6 +--
.../mode/exclusive/ExclusiveOperation.java} | 18 ++++-----
.../exclusive/ExclusiveOperationCallback.java} | 20 ++++-----
.../mode/exclusive/ExclusiveOperatorContext.java | 29 +++++++------
.../mode/exclusive/ExclusiveOperatorEngine.java | 30 ++++++++++----
.../mode/manager/ContextManager.java | 15 ++++---
.../type/exclusive/ExclusiveOperationNodePath.java | 19 +++++----
.../cluster/ClusterContextManagerBuilder.java | 6 +--
.../exclusive/ClusterExclusiveOperatorContext.java | 46 +++++++++++++++++++++
...csLock.java => RefreshStatisticsOperation.java} | 8 ++--
.../statistics/StatisticsRefreshEngine.java | 12 +-----
.../StandaloneContextManagerBuilder.java | 6 +--
.../StandaloneExclusiveOperatorContext.java | 47 ++++++++++++++++++++++
.../transaction/BackendTransactionManager.java | 23 +++++------
...itLock.java => TransactionCommitOperation.java} | 6 +--
.../ral/updatable/lock/LockClusterExecutor.java | 20 ++++-----
...{ClusterLock.java => LockClusterOperation.java} | 8 ++--
.../ral/updatable/lock/UnlockClusterExecutor.java | 22 ++++------
.../backend/connector/ProxySQLExecutorTest.java | 2 +-
.../proxy/backend/context/ProxyContextTest.java | 2 +-
.../distsql/DistSQLQueryBackendHandlerTest.java | 2 +-
.../pipeline/core/util/PipelineContextUtils.java | 3 +-
24 files changed, 229 insertions(+), 155 deletions(-)
diff --git
a/agent/plugins/metrics/type/prometheus/src/test/java/org/apache/shardingsphere/agent/plugin/metrics/prometheus/PrometheusPluginLifecycleServiceTest.java
b/agent/plugins/metrics/type/prometheus/src/test/java/org/apache/shardingsphere/agent/plugin/metrics/prometheus/PrometheusPluginLifecycleServiceTest.java
index 07132930a81..9d8c9e7ec11 100644
---
a/agent/plugins/metrics/type/prometheus/src/test/java/org/apache/shardingsphere/agent/plugin/metrics/prometheus/PrometheusPluginLifecycleServiceTest.java
+++
b/agent/plugins/metrics/type/prometheus/src/test/java/org/apache/shardingsphere/agent/plugin/metrics/prometheus/PrometheusPluginLifecycleServiceTest.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatist
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
-import org.apache.shardingsphere.mode.lock.LockContext;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.standalone.workerid.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -74,6 +74,6 @@ class PrometheusPluginLifecycleServiceTest {
ComputeNodeInstanceContext computeNodeInstanceContext = new
ComputeNodeInstanceContext(
new ComputeNodeInstance(mock(InstanceMetaData.class)), new
ModeConfiguration("Standalone", null), new EventBusContext());
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator());
- return new ContextManager(metaDataContexts,
computeNodeInstanceContext, mock(LockContext.class),
mock(PersistRepository.class));
+ return new ContextManager(metaDataContexts,
computeNodeInstanceContext, mock(PersistRepository.class),
mock(ExclusiveOperatorContext.class));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 6b99daf8ab1..e84e90c7433 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -67,9 +67,7 @@ import
org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
-import org.apache.shardingsphere.mode.lock.LockDefinition;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.parser.rule.SQLParserRule;
import java.sql.SQLException;
@@ -115,26 +113,16 @@ public final class MigrationJobPreparer implements
PipelineJobPreparer<Migration
if (!jobItemManager.getProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
jobItemManager.persistProgress(jobItemContext);
}
- LockDefinition lockDefinition = new GlobalLockDefinition(new
MigrationPrepareLock(jobConfig.getJobId()));
- long startTimeMillis = System.currentTimeMillis();
- if (contextManager.getLockContext().tryLock(lockDefinition, 600 *
1000L)) {
- log.info("Lock success, jobId={}, shardingItem={}, cost {} ms.",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
- try {
- PipelineJobOffsetGovernanceRepository offsetRepository =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
- JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
- if (!offsetInfo.isTargetSchemaTableCreated()) {
- jobItemContext.setStatus(JobStatus.PREPARING);
- jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
- prepareAndCheckTarget(jobItemContext, contextManager);
- offsetRepository.persist(jobId, new JobOffsetInfo(true));
- }
- } finally {
- log.info("Unlock, jobId={}, shardingItem={}, cost {} ms.",
jobId, jobItemContext.getShardingItem(), System.currentTimeMillis() -
startTimeMillis);
- contextManager.getLockContext().unlock(lockDefinition);
+ contextManager.getExclusiveOperatorEngine().operate(new
MigrationPrepareOperation(jobId), 600 * 1000L, () -> {
+ PipelineJobOffsetGovernanceRepository offsetRepository =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey).getJobFacade().getOffset();
+ JobOffsetInfo offsetInfo = offsetRepository.load(jobId);
+ if (!offsetInfo.isTargetSchemaTableCreated()) {
+ jobItemContext.setStatus(JobStatus.PREPARING);
+ jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
+ prepareAndCheckTarget(jobItemContext, contextManager);
+ offsetRepository.persist(jobId, new JobOffsetInfo(true));
}
- } else {
- log.warn("Lock failed, jobId={}, shardingItem={}.", jobId,
jobItemContext.getShardingItem());
- }
+ });
}
private void prepareAndCheckTarget(final MigrationJobItemContext
jobItemContext, final ContextManager contextManager) throws SQLException {
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareOperation.java
similarity index 85%
copy from
kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
copy to
kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareOperation.java
index 3913776721f..02ba105373b 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareOperation.java
@@ -18,13 +18,13 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperation;
/**
- * Migration prepare lock.
+ * Migration prepare exclusive operation.
*/
@RequiredArgsConstructor
-public final class MigrationPrepareLock implements GlobalLock {
+public final class MigrationPrepareOperation implements ExclusiveOperation {
private final String jobId;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperation.java
similarity index 72%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperation.java
index 2d1eaa159d5..523e4fc0457 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperation.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.statistics;
-
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+package org.apache.shardingsphere.mode.exclusive;
/**
- * Statistics lock.
+ * Exclusive operation.
*/
-public final class StatisticsLock implements GlobalLock {
+public interface ExclusiveOperation {
- @Override
- public String getName() {
- return "statistics";
- }
+ /**
+ * Get operation name.
+ *
+ * @return operation name
+ */
+ String getName();
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperationCallback.java
similarity index 72%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperationCallback.java
index 2d1eaa159d5..aa89fb6c9bb 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperationCallback.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.mode.manager.cluster.statistics;
+package org.apache.shardingsphere.mode.exclusive;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+import java.sql.SQLException;
-/**
- * Statistics lock.
- */
-public final class StatisticsLock implements GlobalLock {
+@FunctionalInterface
+public interface ExclusiveOperationCallback {
- @Override
- public String getName() {
- return "statistics";
- }
+ /**
+ * Execute.
+ *
+ * @throws SQLException SQL exception
+ */
+ void execute() throws SQLException;
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperatorContext.java
similarity index 61%
copy from
kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperatorContext.java
index 3913776721f..823a18a9239 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperatorContext.java
@@ -15,21 +15,26 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+package org.apache.shardingsphere.mode.exclusive;
/**
- * Migration prepare lock.
+ * Exclusive operator context.
*/
-@RequiredArgsConstructor
-public final class MigrationPrepareLock implements GlobalLock {
+public interface ExclusiveOperatorContext {
- private final String jobId;
+ /**
+ * Start exclusive operation.
+ *
+ * @param operation exclusive operation
+ * @param timeoutMillis timeout milliseconds
+ * @return is started or not
+ */
+ boolean start(ExclusiveOperation operation, long timeoutMillis);
- @Override
- public String getName() {
- return String.format("migration_prepare_%s", jobId);
- }
+ /**
+ * Stop exclusive operation.
+ *
+ * @param operation exclusive operation
+ */
+ void stop(ExclusiveOperation operation);
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperatorEngine.java
similarity index 51%
copy from
kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperatorEngine.java
index 3913776721f..1bfa21a8a1a 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/exclusive/ExclusiveOperatorEngine.java
@@ -15,21 +15,35 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
+package org.apache.shardingsphere.mode.exclusive;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+
+import java.sql.SQLException;
/**
- * Migration prepare lock.
+ * Exclusive operator engine.
*/
@RequiredArgsConstructor
-public final class MigrationPrepareLock implements GlobalLock {
+public final class ExclusiveOperatorEngine {
- private final String jobId;
+ private final ExclusiveOperatorContext exclusiveOperatorContext;
- @Override
- public String getName() {
- return String.format("migration_prepare_%s", jobId);
+ /**
+ * Operate with exclusive lock.
+ *
+ * @param operation exclusive operation
+ * @param timeoutMillis timeout millis
+ * @param callback callback
+ * @throws SQLException SQL exception
+ */
+ public void operate(final ExclusiveOperation operation, final long
timeoutMillis, final ExclusiveOperationCallback callback) throws SQLException {
+ if (exclusiveOperatorContext.start(operation, timeoutMillis)) {
+ try {
+ callback.execute();
+ } finally {
+ exclusiveOperatorContext.stop(operation);
+ }
+ }
}
}
diff --git
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index 909e7b741a8..08c47de4b15 100644
---
a/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -39,7 +39,8 @@ import
org.apache.shardingsphere.infra.metadata.database.schema.manager.GenericS
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import
org.apache.shardingsphere.infra.metadata.statistics.builder.ShardingSphereStatisticsFactory;
import org.apache.shardingsphere.infra.rule.builder.global.GlobalRulesBuilder;
-import org.apache.shardingsphere.mode.lock.LockContext;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorContext;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorEngine;
import
org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListenerFactory;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.factory.MetaDataContextsFactory;
@@ -65,8 +66,6 @@ public final class ContextManager implements AutoCloseable {
private final ComputeNodeInstanceContext computeNodeInstanceContext;
- private final LockContext lockContext;
-
private final StateContext stateContext;
private final ExecutorEngine executorEngine;
@@ -75,14 +74,20 @@ public final class ContextManager implements AutoCloseable {
private final MetaDataContextManager metaDataContextManager;
- public ContextManager(final MetaDataContexts metaDataContexts, final
ComputeNodeInstanceContext computeNodeInstanceContext, final LockContext
lockContext, final PersistRepository repository) {
+ private final ExclusiveOperatorContext exclusiveOperatorContext;
+
+ private final ExclusiveOperatorEngine exclusiveOperatorEngine;
+
+ public ContextManager(final MetaDataContexts metaDataContexts, final
ComputeNodeInstanceContext computeNodeInstanceContext,
+ final PersistRepository repository, final
ExclusiveOperatorContext exclusiveOperatorContext) {
this.metaDataContexts = metaDataContexts;
this.computeNodeInstanceContext = computeNodeInstanceContext;
- this.lockContext = lockContext;
+ this.exclusiveOperatorContext = exclusiveOperatorContext;
executorEngine =
ExecutorEngine.createExecutorEngineWithSize(metaDataContexts.getMetaData().getProps().<Integer>getValue(ConfigurationPropertyKey.KERNEL_EXECUTOR_SIZE));
metaDataContextManager = new MetaDataContextManager(metaDataContexts,
computeNodeInstanceContext, repository);
persistServiceFacade = new PersistServiceFacade(repository,
computeNodeInstanceContext.getModeConfiguration(), metaDataContextManager);
stateContext = new
StateContext(persistServiceFacade.getStateService().load());
+ exclusiveOperatorEngine = new
ExclusiveOperatorEngine(exclusiveOperatorContext);
ContextManagerLifecycleListenerFactory.getListeners(this).forEach(each
-> each.onInitialized(this));
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/exclusive/ExclusiveOperationNodePath.java
similarity index 68%
rename from
kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
rename to
mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/exclusive/ExclusiveOperationNodePath.java
index 3913776721f..3baf2d0d141 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationPrepareLock.java
+++
b/mode/node/src/main/java/org/apache/shardingsphere/mode/node/path/type/exclusive/ExclusiveOperationNodePath.java
@@ -15,21 +15,20 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
+package org.apache.shardingsphere.mode.node.path.type.exclusive;
+import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+import org.apache.shardingsphere.mode.node.path.NodePath;
+import org.apache.shardingsphere.mode.node.path.NodePathEntity;
/**
- * Migration prepare lock.
+ * Exclusive operation node path.
*/
+@NodePathEntity("/exclusive_operation/${name}")
@RequiredArgsConstructor
-public final class MigrationPrepareLock implements GlobalLock {
+@Getter
+public final class ExclusiveOperationNodePath implements NodePath {
- private final String jobId;
-
- @Override
- public String getName() {
- return String.format("migration_prepare_%s", jobId);
- }
+ private final String name;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 40a207b2b39..e72c3533e03 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -27,13 +27,12 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
import org.apache.shardingsphere.mode.deliver.DeliverEventSubscriber;
import org.apache.shardingsphere.mode.deliver.DeliverEventSubscriberRegistry;
-import org.apache.shardingsphere.mode.lock.LockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilder;
import
org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilderParameter;
import
org.apache.shardingsphere.mode.manager.cluster.dispatch.listener.DataChangedEventListenerRegistry;
import
org.apache.shardingsphere.mode.manager.cluster.exception.MissingRequiredClusterRepositoryConfigurationException;
-import org.apache.shardingsphere.mode.manager.cluster.lock.ClusterLockContext;
+import
org.apache.shardingsphere.mode.manager.cluster.exclusive.ClusterExclusiveOperatorContext;
import
org.apache.shardingsphere.mode.manager.cluster.persist.facade.ClusterPersistServiceFacade;
import
org.apache.shardingsphere.mode.manager.cluster.workerid.ClusterWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
@@ -59,9 +58,8 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
ComputeNodeInstanceContext computeNodeInstanceContext = new
ComputeNodeInstanceContext(new ComputeNodeInstance(param.getInstanceMetaData(),
param.getLabels()), modeConfig, eventBusContext);
ClusterPersistRepository repository =
getClusterPersistRepository(config, computeNodeInstanceContext);
computeNodeInstanceContext.init(new
ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()));
- LockContext lockContext = new ClusterLockContext(repository);
MetaDataContexts metaDataContexts = new MetaDataContextsFactory(new
MetaDataPersistFacade(repository), computeNodeInstanceContext).create(param);
- ContextManager result = new ContextManager(metaDataContexts,
computeNodeInstanceContext, lockContext, repository);
+ ContextManager result = new ContextManager(metaDataContexts,
computeNodeInstanceContext, repository, new
ClusterExclusiveOperatorContext(repository));
registerOnline(computeNodeInstanceContext, param, result);
new
DeliverEventSubscriberRegistry(result.getComputeNodeInstanceContext().getEventBusContext()).register(createDeliverEventSubscribers(repository));
return result;
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/exclusive/ClusterExclusiveOperatorContext.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/exclusive/ClusterExclusiveOperatorContext.java
new file mode 100644
index 00000000000..8faa9425682
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/exclusive/ClusterExclusiveOperatorContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.exclusive;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperation;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorContext;
+import
org.apache.shardingsphere.mode.node.path.engine.generator.NodePathGenerator;
+import
org.apache.shardingsphere.mode.node.path.type.exclusive.ExclusiveOperationNodePath;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import
org.apache.shardingsphere.mode.repository.cluster.core.lock.DistributedLockHolder;
+
+/**
+ * Cluster exclusive operator context.
+ */
+@RequiredArgsConstructor
+public final class ClusterExclusiveOperatorContext implements
ExclusiveOperatorContext {
+
+ private final ClusterPersistRepository repository;
+
+ @Override
+ public boolean start(final ExclusiveOperation operation, final long
timeoutMillis) {
+ return
DistributedLockHolder.getDistributedLock(NodePathGenerator.toPath(new
ExclusiveOperationNodePath(operation.getName())),
+ repository).tryLock(timeoutMillis);
+ }
+
+ @Override
+ public void stop(final ExclusiveOperation operation) {
+ DistributedLockHolder.getDistributedLock(NodePathGenerator.toPath(new
ExclusiveOperationNodePath(operation.getName())), repository).unlock();
+ }
+}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/RefreshStatisticsOperation.java
similarity index 81%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
rename to
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/RefreshStatisticsOperation.java
index 2d1eaa159d5..b67021e209b 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsLock.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/RefreshStatisticsOperation.java
@@ -17,15 +17,15 @@
package org.apache.shardingsphere.mode.manager.cluster.statistics;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperation;
/**
- * Statistics lock.
+ * Refresh statistics operation.
*/
-public final class StatisticsLock implements GlobalLock {
+public final class RefreshStatisticsOperation implements ExclusiveOperation {
@Override
public String getName() {
- return "statistics";
+ return "refresh_statistics";
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsRefreshEngine.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsRefreshEngine.java
index c540f337465..0adf384e5ce 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsRefreshEngine.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/statistics/StatisticsRefreshEngine.java
@@ -31,9 +31,7 @@ import
org.apache.shardingsphere.infra.metadata.statistics.DatabaseStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.ShardingSphereStatistics;
import
org.apache.shardingsphere.infra.metadata.statistics.collector.DialectDatabaseStatisticsCollector;
import
org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereStatisticsCollector;
-import org.apache.shardingsphere.mode.lock.LockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
import java.util.Collection;
import java.util.Map;
@@ -68,15 +66,7 @@ public final class StatisticsRefreshEngine {
public void refresh() {
try {
if
(contextManager.getMetaDataContexts().getMetaData().getTemporaryProps().getValue(TemporaryConfigurationPropertyKey.PROXY_META_DATA_COLLECTOR_ENABLED))
{
- LockContext lockContext = contextManager.getLockContext();
- GlobalLockDefinition lockDefinition = new
GlobalLockDefinition(new StatisticsLock());
- if (lockContext.tryLock(lockDefinition, 5000L)) {
- try {
- refreshStatistics();
- } finally {
- lockContext.unlock(lockDefinition);
- }
- }
+ contextManager.getExclusiveOperatorEngine().operate(new
RefreshStatisticsOperation(), 5000L, this::refreshStatistics);
}
cleanStatisticsData();
// CHECKSTYLE:OFF
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
index d13ec498344..3a2e2df2d38 100644
---
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/StandaloneContextManagerBuilder.java
@@ -22,11 +22,10 @@ import
org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
-import org.apache.shardingsphere.mode.lock.LockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilder;
import
org.apache.shardingsphere.mode.manager.builder.ContextManagerBuilderParameter;
-import
org.apache.shardingsphere.mode.manager.standalone.lock.StandaloneLockContext;
+import
org.apache.shardingsphere.mode.manager.standalone.exclusive.StandaloneExclusiveOperatorContext;
import
org.apache.shardingsphere.mode.manager.standalone.workerid.StandaloneWorkerIdGenerator;
import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
import org.apache.shardingsphere.mode.metadata.factory.MetaDataContextsFactory;
@@ -48,9 +47,8 @@ public final class StandaloneContextManagerBuilder implements
ContextManagerBuil
computeNodeInstanceContext.init(new StandaloneWorkerIdGenerator());
StandalonePersistRepository repository = TypedSPILoader.getService(
StandalonePersistRepository.class, null == repositoryConfig ?
null : repositoryConfig.getType(), null == repositoryConfig ? new Properties()
: repositoryConfig.getProps());
- LockContext lockContext = new StandaloneLockContext();
MetaDataContexts metaDataContexts = new MetaDataContextsFactory(new
MetaDataPersistFacade(repository), computeNodeInstanceContext).create(param);
- return new ContextManager(metaDataContexts,
computeNodeInstanceContext, lockContext, repository);
+ return new ContextManager(metaDataContexts,
computeNodeInstanceContext, repository, new
StandaloneExclusiveOperatorContext());
}
@Override
diff --git
a/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/exclusive/StandaloneExclusiveOperatorContext.java
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/exclusive/StandaloneExclusiveOperatorContext.java
new file mode 100644
index 00000000000..299f120eece
--- /dev/null
+++
b/mode/type/standalone/core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/exclusive/StandaloneExclusiveOperatorContext.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.standalone.exclusive;
+
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperation;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperatorContext;
+import
org.apache.shardingsphere.mode.node.path.engine.generator.NodePathGenerator;
+import
org.apache.shardingsphere.mode.node.path.type.exclusive.ExclusiveOperationNodePath;
+import org.apache.shardingsphere.mode.retry.RetryExecutor;
+
+import java.util.Collection;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * Standalone lock context.
+ */
+public final class StandaloneExclusiveOperatorContext implements
ExclusiveOperatorContext {
+
+ private final Collection<String> exclusiveOperationKeys = new
CopyOnWriteArraySet<>();
+
+ @Override
+ public boolean start(final ExclusiveOperation operation, final long
timeoutMillis) {
+ String operationKey = NodePathGenerator.toPath(new
ExclusiveOperationNodePath(operation.getName()));
+ return new RetryExecutor(timeoutMillis, 50L).execute(arg ->
exclusiveOperationKeys.add(operationKey), null);
+ }
+
+ @Override
+ public void stop(final ExclusiveOperation operation) {
+ String operationKey = NodePathGenerator.toPath(new
ExclusiveOperationNodePath(operation.getName()));
+ exclusiveOperationKeys.remove(operationKey);
+ }
+}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
index e9931ecea92..be78bc50ba1 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/BackendTransactionManager.java
@@ -21,9 +21,6 @@ import
org.apache.shardingsphere.database.connector.core.type.DatabaseType;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import
org.apache.shardingsphere.infra.session.connection.transaction.TransactionConnectionContext;
import org.apache.shardingsphere.infra.spi.type.ordered.OrderedSPILoader;
-import org.apache.shardingsphere.mode.lock.LockContext;
-import org.apache.shardingsphere.mode.lock.LockDefinition;
-import
org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
import
org.apache.shardingsphere.proxy.backend.connector.ProxyDatabaseConnectionManager;
import org.apache.shardingsphere.proxy.backend.connector.TransactionManager;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
@@ -102,17 +99,18 @@ public final class BackendTransactionManager implements
TransactionManager {
return;
}
DatabaseType databaseType =
ProxyContext.getInstance().getDatabaseType();
- LockContext lockContext =
ProxyContext.getInstance().getContextManager().getLockContext();
boolean isNeedLock = isNeedLockWhenCommit();
- LockDefinition lockDefinition = null;
+ if (isNeedLock) {
+ // FIXME if timeout when lock required, TSO not assigned, but
commit will continue, solution is use redis lock in impl to instead of reg
center's lock. #35041
+
ProxyContext.getInstance().getContextManager().getExclusiveOperatorEngine().operate(new
TransactionCommitOperation(), 200L, () -> commit(databaseType));
+ } else {
+ commit(databaseType);
+ }
+ }
+
+ private void commit(final DatabaseType databaseType) throws SQLException {
try {
// FIXME if timeout when lock required, TSO not assigned, but
commit will continue, solution is use redis lock in impl to instead of reg
center's lock. #35041
- if (isNeedLock) {
- lockDefinition = new GlobalLockDefinition(new
TransactionCommitLock());
- if (!lockContext.tryLock(lockDefinition, 200L)) {
- return;
- }
- }
for (Entry<ShardingSphereRule, TransactionHook> entry :
transactionHooks.entrySet()) {
entry.getValue().beforeCommit(entry.getKey(), databaseType,
connection.getCachedConnections().values(), getTransactionContext());
}
@@ -125,9 +123,6 @@ public final class BackendTransactionManager implements
TransactionManager {
for (Entry<ShardingSphereRule, TransactionHook> entry :
transactionHooks.entrySet()) {
entry.getValue().afterCommit(entry.getKey(), databaseType,
connection.getCachedConnections().values(), getTransactionContext());
}
- if (isNeedLock) {
- lockContext.unlock(lockDefinition);
- }
for (Connection each : connection.getCachedConnections().values())
{
ConnectionSavepointManager.getInstance().transactionFinished(each);
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/TransactionCommitLock.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/TransactionCommitOperation.java
similarity index 84%
rename from
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/TransactionCommitLock.java
rename to
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/TransactionCommitOperation.java
index 1ccbba5f395..be28a02eef5 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/TransactionCommitLock.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/connector/jdbc/transaction/TransactionCommitOperation.java
@@ -17,12 +17,12 @@
package org.apache.shardingsphere.proxy.backend.connector.jdbc.transaction;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperation;
/**
- * Transaction commit lock.
+ * Transaction commit operation.
*/
-public final class TransactionCommitLock implements GlobalLock {
+public final class TransactionCommitOperation implements ExclusiveOperation {
@Override
public String getName() {
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/LockClusterExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/LockClusterExecutor.java
index c36d02153f3..9537a0161aa 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/LockClusterExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/LockClusterExecutor.java
@@ -24,13 +24,13 @@ import
org.apache.shardingsphere.infra.algorithm.core.exception.MissingRequiredA
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.exception.external.sql.identifier.SQLExceptionIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import org.apache.shardingsphere.mode.lock.LockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.lock.exception.LockedClusterException;
-import
org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.state.ShardingSphereState;
import org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;
+import java.sql.SQLException;
+
/**
* Lock cluster executor.
*/
@@ -38,20 +38,14 @@ import
org.apache.shardingsphere.proxy.backend.lock.spi.ClusterLockStrategy;
public final class LockClusterExecutor implements
DistSQLUpdateExecutor<LockClusterStatement> {
@Override
- public void executeUpdate(final LockClusterStatement sqlStatement, final
ContextManager contextManager) {
+ public void executeUpdate(final LockClusterStatement sqlStatement, final
ContextManager contextManager) throws SQLException {
checkState(contextManager);
checkAlgorithm(sqlStatement);
- LockContext lockContext = contextManager.getLockContext();
- GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new
ClusterLock());
long timeoutMillis = sqlStatement.getTimeoutMillis().orElse(3000L);
- if (lockContext.tryLock(lockDefinition, timeoutMillis)) {
- try {
- checkState(contextManager);
- TypedSPILoader.getService(ClusterLockStrategy.class,
sqlStatement.getLockStrategy().getName()).lock();
- } finally {
- lockContext.unlock(lockDefinition);
- }
- }
+ contextManager.getExclusiveOperatorEngine().operate(new
LockClusterOperation(), timeoutMillis, () -> {
+ checkState(contextManager);
+ TypedSPILoader.getService(ClusterLockStrategy.class,
sqlStatement.getLockStrategy().getName()).lock();
+ });
}
private void checkState(final ContextManager contextManager) {
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/ClusterLock.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/LockClusterOperation.java
similarity index 82%
rename from
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/ClusterLock.java
rename to
proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/LockClusterOperation.java
index 8a578e748f7..6109daaa4f9 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/ClusterLock.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/LockClusterOperation.java
@@ -17,15 +17,15 @@
package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable.lock;
-import org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLock;
+import org.apache.shardingsphere.mode.exclusive.ExclusiveOperation;
/**
- * Cluster lock.
+ * Lock cluster operation.
*/
-public final class ClusterLock implements GlobalLock {
+public final class LockClusterOperation implements ExclusiveOperation {
@Override
public String getName() {
- return "cluster_lock";
+ return "lock_cluster";
}
}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/UnlockClusterExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/UnlockClusterExecutor.java
index 53959306dbd..a50c3bdb1fc 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/UnlockClusterExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/lock/UnlockClusterExecutor.java
@@ -21,12 +21,12 @@ import
org.apache.shardingsphere.distsql.handler.engine.update.DistSQLUpdateExec
import
org.apache.shardingsphere.distsql.handler.required.DistSQLExecutorClusterModeRequired;
import
org.apache.shardingsphere.distsql.statement.type.ral.updatable.UnlockClusterStatement;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.mode.lock.LockContext;
import org.apache.shardingsphere.mode.manager.ContextManager;
import
org.apache.shardingsphere.mode.manager.cluster.lock.exception.NotLockedClusterException;
-import
org.apache.shardingsphere.mode.manager.cluster.lock.global.GlobalLockDefinition;
import org.apache.shardingsphere.mode.state.ShardingSphereState;
+import java.sql.SQLException;
+
/**
* Unlock cluster executor.
*/
@@ -34,20 +34,14 @@ import
org.apache.shardingsphere.mode.state.ShardingSphereState;
public final class UnlockClusterExecutor implements
DistSQLUpdateExecutor<UnlockClusterStatement> {
@Override
- public void executeUpdate(final UnlockClusterStatement sqlStatement, final
ContextManager contextManager) {
+ public void executeUpdate(final UnlockClusterStatement sqlStatement, final
ContextManager contextManager) throws SQLException {
checkState(contextManager);
- LockContext lockContext = contextManager.getLockContext();
- GlobalLockDefinition lockDefinition = new GlobalLockDefinition(new
ClusterLock());
long timeoutMillis = sqlStatement.getTimeoutMillis().orElse(3000L);
- if (lockContext.tryLock(lockDefinition, timeoutMillis)) {
- try {
- checkState(contextManager);
-
contextManager.getPersistServiceFacade().getStateService().update(ShardingSphereState.OK);
- // TODO unlock snapshot info if locked
- } finally {
- lockContext.unlock(lockDefinition);
- }
- }
+ contextManager.getExclusiveOperatorEngine().operate(new
LockClusterOperation(), timeoutMillis, () -> {
+ checkState(contextManager);
+
contextManager.getPersistServiceFacade().getStateService().update(ShardingSphereState.OK);
+ // TODO unlock snapshot info if locked
+ });
}
private void checkState(final ContextManager contextManager) {
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutorTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutorTest.java
index 43038cbb4df..4fc611c35be 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutorTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/connector/ProxySQLExecutorTest.java
@@ -108,7 +108,7 @@ class ProxySQLExecutorTest {
ComputeNodeInstanceContext computeNodeInstanceContext =
mock(ComputeNodeInstanceContext.class);
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
ContextManager contextManager = new ContextManager(new
MetaDataContexts(metaData,
- ShardingSphereStatisticsFactory.create(metaData, new
ShardingSphereStatistics())), computeNodeInstanceContext, mock(),
mock(PersistRepository.class, RETURNS_DEEP_STUBS));
+ ShardingSphereStatisticsFactory.create(metaData, new
ShardingSphereStatistics())), computeNodeInstanceContext,
mock(PersistRepository.class, RETURNS_DEEP_STUBS), mock());
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
}
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/context/ProxyContextTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/context/ProxyContextTest.java
index 271905bbeb0..75acaece75c 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/context/ProxyContextTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/context/ProxyContextTest.java
@@ -72,7 +72,7 @@ class ProxyContextTest {
void assertInit() {
ShardingSphereMetaData metaData = new ShardingSphereMetaData();
MetaDataContexts metaDataContexts = new MetaDataContexts(metaData, new
ShardingSphereStatistics());
- ProxyContext.init(new ContextManager(metaDataContexts,
mock(ComputeNodeInstanceContext.class, RETURNS_DEEP_STUBS), mock(),
mock(PersistRepository.class)));
+ ProxyContext.init(new ContextManager(metaDataContexts,
mock(ComputeNodeInstanceContext.class, RETURNS_DEEP_STUBS),
mock(PersistRepository.class), mock()));
assertThat(ProxyContext.getInstance().getContextManager().getStateContext(),
is(ProxyContext.getInstance().getContextManager().getStateContext()));
assertThat(ProxyContext.getInstance().getContextManager().getStateContext().getState(),
is(ShardingSphereState.OK));
assertThat(ProxyContext.getInstance().getContextManager().getMetaDataContexts(),
is(ProxyContext.getInstance().getContextManager().getMetaDataContexts()));
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/DistSQLQueryBackendHandlerTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/DistSQLQueryBackendHandlerTest.java
index 3508dcadbb7..5b56ac95ac9 100644
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/DistSQLQueryBackendHandlerTest.java
+++
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/distsql/DistSQLQueryBackendHandlerTest.java
@@ -76,7 +76,7 @@ class DistSQLQueryBackendHandlerTest {
when(connectionSession.getUsedDatabaseName()).thenReturn("unknown");
ComputeNodeInstanceContext computeNodeInstanceContext =
mock(ComputeNodeInstanceContext.class);
when(computeNodeInstanceContext.getModeConfiguration()).thenReturn(mock(ModeConfiguration.class));
- ContextManager contextManager = new ContextManager(metaDataContexts,
computeNodeInstanceContext, mock(), mock(PersistRepository.class));
+ ContextManager contextManager = new ContextManager(metaDataContexts,
computeNodeInstanceContext, mock(PersistRepository.class), mock());
when(ProxyContext.getInstance().getContextManager()).thenReturn(contextManager);
assertThrows(UnknownDatabaseException.class, () -> new
DistSQLQueryBackendHandler(mock(ExportDatabaseConfigurationStatement.class,
RETURNS_DEEP_STUBS), connectionSession).execute());
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 6e79e24d4c5..f7ffb933a15 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -113,7 +113,8 @@ public final class PipelineContextUtils {
(ClusterPersistRepositoryConfiguration)
contextManager.getComputeNodeInstanceContext().getModeConfiguration().getRepository());
MetaDataContexts metaDataContexts =
renewMetaDataContexts(contextManager.getMetaDataContexts(), new
MetaDataPersistFacade(persistRepository, true));
PipelineContextManager.putContext(contextKey,
- new ContextManager(metaDataContexts,
contextManager.getComputeNodeInstanceContext(),
contextManager.getLockContext(),
contextManager.getPersistServiceFacade().getRepository()));
+ new ContextManager(metaDataContexts,
contextManager.getComputeNodeInstanceContext(),
contextManager.getPersistServiceFacade().getRepository(),
+ contextManager.getExclusiveOperatorContext()));
}
@SneakyThrows({ReflectiveOperationException.class, SQLException.class})