This is an automated email from the ASF dual-hosted git repository.
HoustonPutman pushed a commit to branch branch_10x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_10x by this push:
new b5ccd6b3cdb SOLR-18011: Allow locked Admin APIs to call other locked
Admin APIs (#3916)
b5ccd6b3cdb is described below
commit b5ccd6b3cdbd7a74526c658b673412bd2b300c34
Author: Houston Putman <[email protected]>
AuthorDate: Tue Apr 21 10:48:53 2026 -0700
SOLR-18011: Allow locked Admin APIs to call other locked Admin APIs (#3916)
(cherry picked from commit 7ab7a510584c896aedee101dd07047db61a6fcfd)
---
changelog/unreleased/solr-18011-locking-update.yml | 9 +
.../src/java/org/apache/solr/api/V2HttpCall.java | 6 +
.../cloud/DistributedCollectionLockFactory.java | 4 +-
.../org/apache/solr/cloud/DistributedLock.java | 4 +
.../apache/solr/cloud/DistributedMultiLock.java | 20 ++-
.../src/java/org/apache/solr/cloud/LockTree.java | 117 ++++++++++--
.../cloud/OverseerConfigSetMessageHandler.java | 22 ++-
.../apache/solr/cloud/OverseerMessageHandler.java | 9 +-
.../apache/solr/cloud/OverseerTaskProcessor.java | 31 +++-
.../cloud/ZkDistributedCollectionLockFactory.java | 6 +-
.../cloud/ZkDistributedConfigSetLockFactory.java | 2 +-
.../org/apache/solr/cloud/ZkDistributedLock.java | 94 ++++++++--
.../solr/cloud/ZkDistributedLockFactory.java | 18 +-
.../cloud/api/collections/AdminCmdContext.java | 36 +++-
.../api/collections/CollectionApiLockFactory.java | 31 +++-
.../api/collections/CollectionCommandContext.java | 1 +
.../api/collections/CollectionHandlingUtils.java | 20 ++-
...istributedCollectionConfigSetCommandRunner.java | 1 +
.../OverseerCollectionMessageHandler.java | 15 +-
.../solr/handler/admin/CollectionsHandler.java | 7 +-
.../solr/handler/admin/RebalanceLeaders.java | 4 +-
.../solr/handler/admin/api/AdminAPIBase.java | 4 +-
.../solr/handler/component/HttpShardHandler.java | 3 +
.../java/org/apache/solr/servlet/HttpSolrCall.java | 5 +
.../OverseerCollectionConfigSetProcessorTest.java | 108 +++++++++++
.../test/org/apache/solr/cloud/TestLockTree.java | 131 +++++++++++++-
.../apache/solr/cloud/ZkDistributedLockTest.java | 30 ++--
.../api/collections/CollectionApiLockingTest.java | 200 +++++++++++++++++++++
.../solr/common/params/CollectionAdminParams.java | 2 +
.../solr/common/params/CollectionParams.java | 6 +-
30 files changed, 860 insertions(+), 86 deletions(-)
diff --git a/changelog/unreleased/solr-18011-locking-update.yml
b/changelog/unreleased/solr-18011-locking-update.yml
new file mode 100644
index 00000000000..7d2ad4cb472
--- /dev/null
+++ b/changelog/unreleased/solr-18011-locking-update.yml
@@ -0,0 +1,9 @@
+# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
+title: Allow locked Admin APIs to call other locked AdminAPIs. These locked
Admin APIs can only call other APIs on the same resource tree (Collection >
Shard > Replica) to protect against deadlocks.
+type: changed # added, changed, fixed, deprecated, removed, dependency_update,
security, other
+authors:
+ - name: Houston Putman
+ nick: HoustonPutman
+links:
+ - name: SOLR-18011
+ url: https://issues.apache.org/jira/browse/SOLR-18011
diff --git a/solr/core/src/java/org/apache/solr/api/V2HttpCall.java
b/solr/core/src/java/org/apache/solr/api/V2HttpCall.java
index ba2b6fa7af8..df88f992e8e 100644
--- a/solr/core/src/java/org/apache/solr/api/V2HttpCall.java
+++ b/solr/core/src/java/org/apache/solr/api/V2HttpCall.java
@@ -18,6 +18,7 @@
package org.apache.solr.api;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
+import static
org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER;
import static org.apache.solr.servlet.HttpSolrCall.Action.ADMIN;
import static org.apache.solr.servlet.HttpSolrCall.Action.ADMIN_OR_REMOTEPROXY;
import static org.apache.solr.servlet.HttpSolrCall.Action.PROCESS;
@@ -212,6 +213,11 @@ public class V2HttpCall extends HttpSolrCall {
solrReq.getContext().put(CoreContainer.class.getName(), cores);
requestType = AuthorizationContext.RequestType.ADMIN;
action = ADMIN;
+
+ String callingLockId = req.getHeader(CALLING_LOCK_ID_HEADER);
+ if (callingLockId != null && !callingLockId.isBlank()) {
+ solrReq.getContext().put(CALLING_LOCK_ID_HEADER, callingLockId);
+ }
}
protected void parseRequest() throws Exception {
diff --git
a/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java
b/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java
index e49aba6c3f5..f2ab771232e 100644
---
a/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java
+++
b/solr/core/src/java/org/apache/solr/cloud/DistributedCollectionLockFactory.java
@@ -54,6 +54,7 @@ public interface DistributedCollectionLockFactory {
* @param replicaName is ignored and can be {@code null} if {@code level} is
{@link
* org.apache.solr.common.params.CollectionParams.LockLevel#COLLECTION}
or {@link
* org.apache.solr.common.params.CollectionParams.LockLevel#SHARD}
+ * @param callingLockId the lockId from the caller that should be mirrored
by this lock
* @return a lock instance that must be {@link DistributedLock#release()}'ed
in a {@code finally},
* regardless of the lock having been acquired or not.
*/
@@ -62,5 +63,6 @@ public interface DistributedCollectionLockFactory {
CollectionParams.LockLevel level,
String collName,
String shardId,
- String replicaName);
+ String replicaName,
+ String callingLockId);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java
b/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java
index 1929766e86e..c26c5d499f0 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedLock.java
@@ -24,4 +24,8 @@ public interface DistributedLock {
void release();
boolean isAcquired();
+
+ String getLockId();
+
+ boolean isMirroringLock();
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java
b/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java
index 9979c144e84..97be2293399 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedMultiLock.java
@@ -20,7 +20,9 @@ package org.apache.solr.cloud;
import com.google.common.annotations.VisibleForTesting;
import java.lang.invoke.MethodHandles;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.StrUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +48,12 @@ public class DistributedMultiLock {
for (DistributedLock lock : locks) {
log.debug("DistributedMultiLock.waitUntilAcquired. About to wait on lock
{}", lock);
lock.waitUntilAcquired();
- log.debug("DistributedMultiLock.waitUntilAcquired. Acquired lock {}",
lock);
+ if (lock.isMirroringLock()) {
+ log.debug(
+ "DistributedMultiLock.waitUntilAcquired. Mirroring
already-acquired lock {}", lock);
+ } else {
+ log.debug("DistributedMultiLock.waitUntilAcquired. Acquired lock {}",
lock);
+ }
}
}
@@ -70,6 +77,17 @@ public class DistributedMultiLock {
return true;
}
+ public String getLockId() {
+ return
locks.stream().map(DistributedLock::getLockId).collect(Collectors.joining(","));
+ }
+
+ public static List<String> splitLockIds(String lockIds) {
+ if (StrUtils.isBlank(lockIds)) {
+ return List.of();
+ }
+ return List.of(lockIds.split(","));
+ }
+
@VisibleForTesting
public int getCountInternalLocks() {
return locks.size();
diff --git a/solr/core/src/java/org/apache/solr/cloud/LockTree.java
b/solr/core/src/java/org/apache/solr/cloud/LockTree.java
index e8d96d4f2cd..6bb432abd8e 100644
--- a/solr/core/src/java/org/apache/solr/cloud/LockTree.java
+++ b/solr/core/src/java/org/apache/solr/cloud/LockTree.java
@@ -21,8 +21,11 @@ import java.lang.invoke.MethodHandles;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import java.util.UUID;
import org.apache.solr.cloud.OverseerMessageHandler.Lock;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.LockLevel;
import org.apache.solr.common.util.StrUtils;
@@ -38,20 +41,36 @@ public class LockTree {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Node root = new Node(null, LockLevel.CLUSTER, null);
+ public final Map<String, Lock> allLocks = new HashMap<>();
+
private class LockImpl implements Lock {
final Node node;
+ final String id;
LockImpl(Node node) {
this.node = node;
+ this.id = UUID.randomUUID().toString();
}
@Override
public void unlock() {
synchronized (LockTree.this) {
- node.unlock(this);
+ if (node.unlock(this)) {
+ allLocks.remove(id);
+ }
}
}
+ @Override
+ public String id() {
+ return id;
+ }
+
+ @Override
+ public boolean validateSubpath(int lockLevel, List<String> path) {
+ return node.validateSubpath(lockLevel, path);
+ }
+
@Override
public String toString() {
return StrUtils.join(node.constructPath(new ArrayDeque<>()), '/');
@@ -71,12 +90,43 @@ public class LockTree {
public class Session {
private SessionNode root = new SessionNode(LockLevel.CLUSTER);
- public Lock lock(CollectionParams.CollectionAction action, List<String>
path) {
+ public Lock lock(
+ CollectionParams.CollectionAction action, List<String> path, String
callingLockId) {
if (action.lockLevel == LockLevel.NONE) return FREELOCK;
+ Node startingNode = LockTree.this.root;
+ SessionNode startingSession = root;
+
+ // If a callingLockId was passed in, validate it with the current lock
path, and only start
+ // locking below the calling lock
+ Lock callingLock = StrUtils.isBlank(callingLockId) ? null :
allLocks.get(callingLockId);
+ log.debug("Calling lock id: {}, level: {}", callingLockId, callingLock);
+ boolean reuseCurrentLock = false;
+ if (callingLock != null) {
+ if (callingLock.validateSubpath(action.lockLevel.getHeight(), path)) {
+ startingNode = ((LockImpl) callingLock).node;
+ startingSession =
startingSession.find(startingNode.level.getHeight(), path);
+ if (startingSession == null) {
+ startingSession = root;
+ }
+ reuseCurrentLock = true;
+ } else {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ String.format(
+ Locale.ROOT,
+ "Cannot lock an action under a different path than the
calling action. Calling action lock path: %s, Requested action lock path: %s",
+ callingLock,
+ String.join("/", path)));
+ }
+ }
synchronized (LockTree.this) {
- if (root.isBusy(action.lockLevel, path)) return null;
- Lock lockObject = LockTree.this.root.lock(action.lockLevel, path);
- if (lockObject == null) root.markBusy(action.lockLevel, path);
+ if (startingSession.isBusy(action.lockLevel, path)) return null;
+ Lock lockObject = startingNode.lock(action.lockLevel, path,
reuseCurrentLock);
+ if (lockObject == null) {
+ startingSession.markBusy(action.lockLevel, path);
+ } else {
+ allLocks.put(lockObject.id(), lockObject);
+ }
return lockObject;
}
}
@@ -125,6 +175,18 @@ public class LockTree {
return false;
}
}
+
+ SessionNode find(int lockLevel, List<String> path) {
+ if (level.getHeight() == lockLevel) {
+ return this;
+ } else if (level.getHeight() < lockLevel
+ && kids != null
+ && kids.containsKey(path.get(level.getHeight()))) {
+ return kids.get(path.get(level.getHeight())).find(lockLevel, path);
+ } else {
+ return null;
+ }
+ }
}
public Session getSession() {
@@ -135,6 +197,7 @@ public class LockTree {
final String name;
final Node mom;
final LockLevel level;
+ int refCount = 0;
HashMap<String, Node> children = new HashMap<>();
LockImpl myLock;
@@ -151,30 +214,49 @@ public class LockTree {
return false;
}
- void unlock(LockImpl lockObject) {
+ boolean unlock(LockImpl lockObject) {
+ if (--refCount > 0) {
+ return false;
+ }
if (myLock == lockObject) myLock = null;
else {
log.info("Unlocked multiple times : {}", lockObject);
}
+ return true;
}
- Lock lock(LockLevel lockLevel, List<String> path) {
- if (myLock != null) return null; // I'm already locked. no need to go
any further
+ Lock lock(LockLevel lockLevel, List<String> path, boolean
reuseCurrentLock) {
+ if (myLock != null && !reuseCurrentLock) {
+ // I'm already locked. no need to go any further
+ return null;
+ }
if (lockLevel == level) {
// lock is supposed to be acquired at this level
+ if (myLock != null && reuseCurrentLock) {
+ // I am already locked, and I want to be re-used
+ refCount++;
+ return myLock;
+ }
// If I am locked or any of my children or grandchildren are locked
// it is not possible to acquire a lock
if (isLocked()) return null;
+ refCount++;
return myLock = new LockImpl(this);
} else {
String childName = path.get(level.getHeight());
Node child = children.get(childName);
if (child == null)
children.put(childName, child = new Node(childName,
level.getChild(), this));
- return child.lock(lockLevel, path);
+ return child.lock(lockLevel, path, false);
}
}
+ boolean validateSubpath(int lockLevel, List<String> path) {
+ return level.getHeight() <= lockLevel
+ && (level.getHeight() == 0 || name.equals(path.get(level.getHeight()
- 1)))
+ && (mom == null || mom.validateSubpath(lockLevel, path));
+ }
+
ArrayDeque<String> constructPath(ArrayDeque<String> collect) {
if (name != null) collect.addFirst(name);
if (mom != null) mom.constructPath(collect);
@@ -182,5 +264,20 @@ public class LockTree {
}
}
- static final Lock FREELOCK = () -> {};
+ static final String FREELOCK_ID = "-1";
+ static final Lock FREELOCK =
+ new Lock() {
+ @Override
+ public void unlock() {}
+
+ @Override
+ public String id() {
+ return FREELOCK_ID;
+ }
+
+ @Override
+ public boolean validateSubpath(int lockLevel, List<String> path) {
+ return false;
+ }
+ };
}
diff --git
a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
index 0bf454a0642..750fc62effa 100644
---
a/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
+++
b/solr/core/src/java/org/apache/solr/cloud/OverseerConfigSetMessageHandler.java
@@ -21,6 +21,7 @@ import static org.apache.solr.common.params.CommonParams.NAME;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
@@ -61,7 +62,7 @@ public class OverseerConfigSetMessageHandler implements
OverseerMessageHandler {
}
@Override
- public OverseerSolrResponse processMessage(ZkNodeProps message, String
operation) {
+ public OverseerSolrResponse processMessage(ZkNodeProps message, String
operation, Lock lock) {
NamedList<Object> results = new NamedList<>();
try {
if (!operation.startsWith(CONFIGSETS_ACTION_PREFIX)) {
@@ -113,11 +114,26 @@ public class OverseerConfigSetMessageHandler implements
OverseerMessageHandler {
}
@Override
- public Lock lockTask(ZkNodeProps message, long ignored) {
+ public Lock lockTask(ZkNodeProps message, long ignored, String
callingLockId) {
String configSetName = getTaskKey(message);
if (canExecute(configSetName, message)) {
markExclusiveTask(configSetName, message);
- return () -> unmarkExclusiveTask(configSetName, message);
+ return new Lock() {
+ @Override
+ public void unlock() {
+ unmarkExclusiveTask(configSetName, message);
+ }
+
+ @Override
+ public String id() {
+ return configSetName;
+ }
+
+ @Override
+ public boolean validateSubpath(int lockLevel, List<String> path) {
+ return false;
+ }
+ };
}
return null;
}
diff --git
a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
index 3e369b90731..c01f365a3d7 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerMessageHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cloud;
+import java.util.List;
import org.apache.solr.common.cloud.ZkNodeProps;
/** Interface for processing messages received by an {@link
OverseerTaskProcessor} */
@@ -26,7 +27,7 @@ public interface OverseerMessageHandler {
* @param operation the operation to process
* @return response
*/
- OverseerSolrResponse processMessage(ZkNodeProps message, String operation);
+ OverseerSolrResponse processMessage(ZkNodeProps message, String operation,
Lock lock);
/**
* @return the name of the OverseerMessageHandler
@@ -41,6 +42,10 @@ public interface OverseerMessageHandler {
interface Lock {
void unlock();
+
+ String id();
+
+ boolean validateSubpath(int lockLevel, List<String> path);
}
/**
@@ -48,7 +53,7 @@ public interface OverseerMessageHandler {
*
* @return <code>null</code> if locking is not possible.
*/
- Lock lockTask(ZkNodeProps message, long batchSessionId);
+ Lock lockTask(ZkNodeProps message, long batchSessionId, String
callingLockId);
/**
* @param message the message being processed
diff --git
a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
index 14bc88b583f..30ff9a31996 100644
--- a/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
+++ b/solr/core/src/java/org/apache/solr/cloud/OverseerTaskProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.cloud;
+import static
org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.ID;
@@ -37,11 +38,13 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
@@ -327,8 +330,32 @@ public class OverseerTaskProcessor implements
SolrInfoBean, Runnable, Closeable
workQueue.remove(head, asyncId == null);
continue;
}
+ if (operation == null) {
+ log.error("Msg does not have required {} : {}",
Overseer.QUEUE_OPERATION, message);
+ workQueue.remove(head, asyncId == null);
+ continue;
+ }
+ String callingLockId = message.getStr(CALLING_LOCK_ID_HEADER);
OverseerMessageHandler messageHandler =
selector.selectOverseerMessageHandler(message);
- OverseerMessageHandler.Lock lock =
messageHandler.lockTask(message, batchSessionId);
+ OverseerMessageHandler.Lock lock;
+ try {
+ lock = messageHandler.lockTask(message, batchSessionId,
callingLockId);
+ } catch (SolrException e) {
+ // Lock acquisition can throw if e.g. callingLockId references
an unrelated
+ // action. In that case, fail the task immediately rather than
retrying.
+ log.error(
+ "Error occurred while trying to acquire lock for task [{}]",
head.getId(), e);
+ NamedList<Object> errResp = new NamedList<>();
+ errResp.add("exception", e.getMessage());
+ OverseerSolrResponse response = new
OverseerSolrResponse(errResp);
+ if (asyncId != null) {
+ failureMap.put(asyncId,
OverseerSolrResponseSerializer.serialize(response));
+ } else {
+
head.setBytes(OverseerSolrResponseSerializer.serialize(response));
+ }
+ workQueue.remove(head, asyncId == null);
+ continue;
+ }
if (lock == null) {
if (log.isDebugEnabled()) {
log.debug("Exclusivity check failed for [{}]", message);
@@ -561,7 +588,7 @@ public class OverseerTaskProcessor implements SolrInfoBean,
Runnable, Closeable
if (log.isDebugEnabled()) {
log.debug("Runner processing {}", head.getId());
}
- response = messageHandler.processMessage(message, operation);
+ response = messageHandler.processMessage(message, operation, lock);
} finally {
timerContext.stop();
updateStats(statsName);
diff --git
a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java
b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java
index fe77b18c3f2..098545b6afc 100644
---
a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java
+++
b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedCollectionLockFactory.java
@@ -44,7 +44,8 @@ public class ZkDistributedCollectionLockFactory extends
ZkDistributedLockFactory
CollectionParams.LockLevel level,
String collName,
String shardId,
- String replicaName) {
+ String replicaName,
+ String lockIdToMirror) {
Objects.requireNonNull(collName, "collName can't be null");
if (level != CollectionParams.LockLevel.COLLECTION) {
Objects.requireNonNull(
@@ -56,7 +57,8 @@ public class ZkDistributedCollectionLockFactory extends
ZkDistributedLockFactory
}
String lockPath = getLockPath(level, collName, shardId, replicaName);
- return doCreateLock(isWriteLock, lockPath);
+
+ return doCreateLock(isWriteLock, lockPath, lockIdToMirror);
}
/**
diff --git
a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java
b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java
index 884703a8829..3458ea32215 100644
---
a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java
+++
b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedConfigSetLockFactory.java
@@ -40,7 +40,7 @@ public class ZkDistributedConfigSetLockFactory extends
ZkDistributedLockFactory
Objects.requireNonNull(configSetName, "configSetName can't be null");
String lockPath = getLockPath(configSetName);
- return doCreateLock(isWriteLock, lockPath);
+ return doCreateLock(isWriteLock, lockPath, null);
}
/**
diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java
b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java
index 4b594508d2a..30b70f66b82 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLock.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import
org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -41,16 +42,20 @@ abstract class ZkDistributedLock implements DistributedLock
{
static final char LOCK_PREFIX_SUFFIX = '_';
/** Prefix of EPHEMERAL read lock node names */
- static final String READ_LOCK_PREFIX = "R" + LOCK_PREFIX_SUFFIX;
+ static final char READ_LOCK_PREFIX_CHAR = 'R';
+
+ static final String READ_LOCK_PREFIX = "" + READ_LOCK_PREFIX_CHAR +
LOCK_PREFIX_SUFFIX;
/** Prefix of EPHEMERAL write lock node names */
- static final String WRITE_LOCK_PREFIX = "W" + LOCK_PREFIX_SUFFIX;
+ static final char WRITE_LOCK_PREFIX_CHAR = 'W';
+
+ static final String WRITE_LOCK_PREFIX = "" + WRITE_LOCK_PREFIX_CHAR +
LOCK_PREFIX_SUFFIX;
/** Read lock. */
static class Read extends ZkDistributedLock {
- protected Read(SolrZkClient zkClient, String lockPath)
+ protected Read(SolrZkClient zkClient, String lockPath, String
lockIdToMirror)
throws KeeperException, InterruptedException {
- super(zkClient, lockPath, READ_LOCK_PREFIX);
+ super(zkClient, lockPath, READ_LOCK_PREFIX, lockIdToMirror);
}
@Override
@@ -59,13 +64,18 @@ abstract class ZkDistributedLock implements DistributedLock
{
// Lower numbered read locks are ok, they can coexist.
return otherLockName.startsWith(WRITE_LOCK_PREFIX);
}
+
+ @Override
+ boolean canMirrorLock(String lockId) {
+ return true;
+ }
}
/** Write lock. */
static class Write extends ZkDistributedLock {
- protected Write(SolrZkClient zkClient, String lockPath)
+ protected Write(SolrZkClient zkClient, String lockPath, String
lockIdToMirror)
throws KeeperException, InterruptedException {
- super(zkClient, lockPath, WRITE_LOCK_PREFIX);
+ super(zkClient, lockPath, WRITE_LOCK_PREFIX, lockIdToMirror);
}
@Override
@@ -73,6 +83,17 @@ abstract class ZkDistributedLock implements DistributedLock {
// A write lock is blocked by another read or write lock with a lower
sequence number
return true;
}
+
+ @Override
+ boolean canMirrorLock(String lockId) {
+ // Only another Write lock can be mirrored
+ int lockTypeSuffixIndex = lockId.lastIndexOf(LOCK_PREFIX_SUFFIX) - 1;
+ if (lockTypeSuffixIndex < 0) {
+ return false;
+ } else {
+ return lockId.charAt(lockTypeSuffixIndex) == WRITE_LOCK_PREFIX_CHAR;
+ }
+ }
}
private final SolrZkClient zkClient;
@@ -80,22 +101,43 @@ abstract class ZkDistributedLock implements
DistributedLock {
private final String lockNode;
protected final long sequence;
protected volatile boolean released = false;
+ protected final boolean mirrored;
- protected ZkDistributedLock(SolrZkClient zkClient, String lockDir, String
lockNodePrefix)
+ protected ZkDistributedLock(
+ SolrZkClient zkClient, String lockDir, String lockNodePrefix, String
lockIdToMirror)
throws KeeperException, InterruptedException {
this.zkClient = zkClient;
this.lockDir = lockDir;
// Create the SEQUENTIAL EPHEMERAL node. We enter the locking rat race
here. We MUST eventually
// call release() or we block others.
- lockNode =
- zkClient.create(
- lockDir
- + DistributedCollectionConfigSetCommandRunner.ZK_PATH_SEPARATOR
- + lockNodePrefix,
- null,
- CreateMode.EPHEMERAL_SEQUENTIAL);
+ if (StrUtils.isBlank(lockIdToMirror)) {
+ lockNode =
+ zkClient.create(
+ lockDir
+ +
DistributedCollectionConfigSetCommandRunner.ZK_PATH_SEPARATOR
+ + lockNodePrefix,
+ null,
+ CreateMode.EPHEMERAL_SEQUENTIAL);
+ mirrored = false;
+ } else {
+ if (!lockIdToMirror.startsWith(lockDir)) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Requested lock with path: "
+ + lockDir
+ + " cannot mirror the callingLock with id: "
+ + lockIdToMirror);
+ }
+ if (!canMirrorLock(lockIdToMirror)) {
+ throw new SolrException(
+ SolrException.ErrorCode.SERVER_ERROR,
+ "Cannot mirror lock " + lockIdToMirror + " with given lockPrefix:
" + lockNodePrefix);
+ }
+ lockNode = lockIdToMirror;
+ mirrored = true;
+ }
sequence = getSequenceFromNodename(lockNode);
}
@@ -158,8 +200,10 @@ abstract class ZkDistributedLock implements
DistributedLock {
@Override
public void release() {
try {
- zkClient.delete(lockNode, -1);
- released = true;
+ if (!mirrored) {
+ zkClient.delete(lockNode, -1);
+ released = true;
+ }
} catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
@@ -212,7 +256,11 @@ abstract class ZkDistributedLock implements
DistributedLock {
if (!foundSelf) {
// If this basic assumption doesn't hold with Zookeeper, we're in deep
trouble. And not only
// here.
- throw new SolrException(SERVER_ERROR, "Missing lock node " + lockNode);
+ if (mirrored) {
+ throw new SolrException(SERVER_ERROR, "Missing mirrored lock node " +
lockNode);
+ } else {
+ throw new SolrException(SERVER_ERROR, "Missing lock node " + lockNode);
+ }
}
// Didn't return early on any other blocking lock, means we own it
@@ -240,6 +288,18 @@ abstract class ZkDistributedLock implements
DistributedLock {
return Long.parseLong(lockNode.substring(lockNode.length() -
SEQUENCE_LENGTH));
}
+ @Override
+ public String getLockId() {
+ return lockNode;
+ }
+
+ @Override
+ public boolean isMirroringLock() {
+ return mirrored;
+ }
+
+ abstract boolean canMirrorLock(String lockId);
+
@Override
public String toString() {
return lockNode;
diff --git
a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java
b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java
index 76696dc3942..969e65a6aaa 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ZkDistributedLockFactory.java
@@ -20,6 +20,7 @@ package org.apache.solr.cloud;
import
org.apache.solr.cloud.api.collections.DistributedCollectionConfigSetCommandRunner;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.common.util.StrUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -33,16 +34,19 @@ abstract class ZkDistributedLockFactory {
this.rootPath = rootPath;
}
- protected DistributedLock doCreateLock(boolean isWriteLock, String lockPath)
{
+ protected DistributedLock doCreateLock(
+ boolean isWriteLock, String lockPath, String lockIdToMirror) {
try {
- // TODO optimize by first attempting to create the ZkDistributedLock
without calling
- // makeLockPath() and only call it if the lock creation fails. This will
be less costly on
- // high contention (and slightly more on low contention)
- makeLockPath(lockPath);
+ if (StrUtils.isBlank(lockIdToMirror)) {
+ // TODO optimize by first attempting to create the ZkDistributedLock
without calling
+ // makeLockPath() and only call it if the lock creation fails. This
will be less costly on
+ // high contention (and slightly more on low contention)
+ makeLockPath(lockPath);
+ }
return isWriteLock
- ? new ZkDistributedLock.Write(zkClient, lockPath)
- : new ZkDistributedLock.Read(zkClient, lockPath);
+ ? new ZkDistributedLock.Write(zkClient, lockPath, lockIdToMirror)
+ : new ZkDistributedLock.Read(zkClient, lockPath, lockIdToMirror);
} catch (KeeperException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
} catch (InterruptedException e) {
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java
index 48b7c475c74..ee5f56deca1 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/AdminCmdContext.java
@@ -17,12 +17,17 @@
package org.apache.solr.cloud.api.collections;
+import static
org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER;
+
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.params.CollectionParams;
+import org.apache.solr.request.SolrQueryRequest;
public class AdminCmdContext {
private final CollectionParams.CollectionAction action;
private final String asyncId;
+ private String lockId;
+ private String callingLockId;
private ClusterState clusterState;
public AdminCmdContext(CollectionParams.CollectionAction action) {
@@ -34,6 +39,13 @@ public class AdminCmdContext {
this.asyncId = asyncId;
}
+ public AdminCmdContext(
+ CollectionParams.CollectionAction action, String asyncId,
SolrQueryRequest req) {
+ this.action = action;
+ this.asyncId = asyncId;
+ this.withCallingLockId((String)
req.getContext().get(CALLING_LOCK_ID_HEADER));
+ }
+
public CollectionParams.CollectionAction getAction() {
return action;
}
@@ -42,6 +54,24 @@ public class AdminCmdContext {
return asyncId;
}
+ public AdminCmdContext withLockId(String lockId) {
+ this.lockId = lockId;
+ return this;
+ }
+
+ public String getLockId() {
+ return lockId;
+ }
+
+ public AdminCmdContext withCallingLockId(String callingLockId) {
+ this.callingLockId = callingLockId;
+ return this;
+ }
+
+ public String getCallingLockId() {
+ return callingLockId;
+ }
+
public ClusterState getClusterState() {
return clusterState;
}
@@ -57,7 +87,9 @@ public class AdminCmdContext {
public AdminCmdContext subRequestContext(
CollectionParams.CollectionAction action, String asyncId) {
- AdminCmdContext nextContext = new AdminCmdContext(action, asyncId);
- return nextContext.withClusterState(clusterState);
+ return new AdminCmdContext(action, asyncId)
+ .withCallingLockId(callingLockId)
+ .withLockId(lockId)
+ .withClusterState(clusterState);
}
}
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java
index 2c5342c008d..62cf0dd65af 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionApiLockFactory.java
@@ -107,22 +107,37 @@ public class CollectionApiLockFactory {
// CollectionParams.LockLevel.COLLECTION;
}
- // The first requested lock is a write one (on the target object for the
action, depending on
- // lock level), then requesting read locks on "higher" levels (collection
> shard > replica here
- // for the level. Note LockLevel "height" is other way around).
- boolean requestWriteLock = true;
+ List<String> callingLockIdList =
+ DistributedMultiLock.splitLockIds(adminCmdContext.getCallingLockId());
+
final CollectionParams.LockLevel[] iterationOrder = {
- CollectionParams.LockLevel.REPLICA,
+ CollectionParams.LockLevel.COLLECTION,
CollectionParams.LockLevel.SHARD,
- CollectionParams.LockLevel.COLLECTION
+ CollectionParams.LockLevel.REPLICA
};
List<DistributedLock> locks = new ArrayList<>(iterationOrder.length);
+ int lockLevelCount = 0;
+
for (CollectionParams.LockLevel level : iterationOrder) {
// This comparison is based on the LockLevel height value that
classifies replica > shard >
// collection.
if (lockLevel.isHigherOrEqual(level)) {
- locks.add(lockFactory.createLock(requestWriteLock, level, collName,
shardId, replicaName));
- requestWriteLock = false;
+ // The last requested lock is either a write or read one (on the
target object for the
+ // action, depending on lock level) depending on what the action is.
All "higher" levels of
+ // locks are reads (collection > shard > replica here for the level.
Note LockLevel "height"
+ // is other way around).
+ boolean requestWriteLock = lockLevel.isEqual(level) &&
adminCmdContext.getAction().isWrite;
+ // Find the matching callingLockId for this level, if it was provided.
All levels must be
+ // provided in order by the caller, so when we run out of
callingLockIds, we are done
+ // mirroring and should start getting new locks.
+ String callingLockIdMatch = null;
+ if (lockLevelCount < callingLockIdList.size()) {
+ callingLockIdMatch = callingLockIdList.get(lockLevelCount++);
+ }
+ DistributedLock lock =
+ lockFactory.createLock(
+ requestWriteLock, level, collName, shardId, replicaName,
callingLockIdMatch);
+ locks.add(lock);
}
}
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
index 20c3e935e29..94dae81bc72 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionCommandContext.java
@@ -58,6 +58,7 @@ public interface CollectionCommandContext {
default ShardRequestTracker asyncRequestTracker(AdminCmdContext
adminCmdContext) {
return new ShardRequestTracker(
adminCmdContext.getAsyncId(),
+ adminCmdContext.getLockId(),
getAdminPath(),
getZkStateReader(),
newShardHandler().getShardHandlerFactory());
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java
index f5e030d81e5..6ba0bc39a6f 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/CollectionHandlingUtils.java
@@ -17,6 +17,7 @@
package org.apache.solr.cloud.api.collections;
+import static
org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER;
import static
org.apache.solr.common.params.CollectionParams.CollectionAction.DELETE;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
@@ -629,7 +630,7 @@ public class CollectionHandlingUtils {
public static ShardRequestTracker syncRequestTracker(
AdminCmdContext adminCmdContext, String adminPath,
CollectionCommandContext ccc) {
- return requestTracker(null, adminPath, ccc);
+ return requestTracker(null, adminCmdContext.getLockId(), adminPath, ccc);
}
public static ShardRequestTracker asyncRequestTracker(
@@ -639,17 +640,23 @@ public class CollectionHandlingUtils {
public static ShardRequestTracker asyncRequestTracker(
AdminCmdContext adminCmdContext, String adminPath,
CollectionCommandContext ccc) {
- return requestTracker(adminCmdContext.getAsyncId(), adminPath, ccc);
+ return requestTracker(
+ adminCmdContext.getAsyncId(), adminCmdContext.getLockId(), adminPath,
ccc);
}
protected static ShardRequestTracker requestTracker(
- String asyncId, String adminPath, CollectionCommandContext ccc) {
+ String asyncId, String lockId, String adminPath,
CollectionCommandContext ccc) {
return new ShardRequestTracker(
- asyncId, adminPath, ccc.getZkStateReader(),
ccc.newShardHandler().getShardHandlerFactory());
+ asyncId,
+ lockId,
+ adminPath,
+ ccc.getZkStateReader(),
+ ccc.newShardHandler().getShardHandlerFactory());
}
public static class ShardRequestTracker {
private final String asyncId;
+ private final String lockId;
private final String adminPath;
private final ZkStateReader zkStateReader;
private final ShardHandlerFactory shardHandlerFactory;
@@ -657,10 +664,12 @@ public class CollectionHandlingUtils {
public ShardRequestTracker(
String asyncId,
+ String lockId,
String adminPath,
ZkStateReader zkStateReader,
ShardHandlerFactory shardHandlerFactory) {
this.asyncId = asyncId;
+ this.lockId = lockId;
this.adminPath = adminPath;
this.zkStateReader = zkStateReader;
this.shardHandlerFactory = shardHandlerFactory;
@@ -733,6 +742,9 @@ public class CollectionHandlingUtils {
sreq.nodeName = nodeName;
sreq.coreNodeName = coreNodeName;
sreq.params = params;
+ if (StrUtils.isNotBlank(lockId)) {
+ sreq.headers = Map.of(CALLING_LOCK_ID_HEADER, lockId);
+ }
shardHandler.submit(sreq, replica, sreq.params);
}
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
index 4cc2c2ca1bf..a626af3c45c 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/DistributedCollectionConfigSetCommandRunner.java
@@ -411,6 +411,7 @@ public class DistributedCollectionConfigSetCommandRunner {
// Block this thread until all required locks are acquired.
lock.waitUntilAcquired();
+ adminCmdContext.withLockId(lock.getLockId());
// Got the lock so moving from submitted to running if we run for an
async task (if
// asyncId is null the asyncTaskTracker calls do nothing).
diff --git
a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
index 793195c40f3..20d06bd4890 100644
---
a/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
+++
b/solr/core/src/java/org/apache/solr/cloud/api/collections/OverseerCollectionMessageHandler.java
@@ -21,6 +21,7 @@ import static
org.apache.solr.cloud.api.collections.CollectionHandlingUtils.logF
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static
org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
@@ -113,7 +114,7 @@ public class OverseerCollectionMessageHandler implements
OverseerMessageHandler,
}
@Override
- public OverseerSolrResponse processMessage(ZkNodeProps message, String
operation) {
+ public OverseerSolrResponse processMessage(ZkNodeProps message, String
operation, Lock lock) {
// sometimes overseer messages have the collection name in 'name' field,
not 'collection'
MDCLoggingContext.setCollection(
message.getStr(COLLECTION_PROP) != null
@@ -128,8 +129,11 @@ public class OverseerCollectionMessageHandler implements
OverseerMessageHandler,
CollectionAction action = getCollectionAction(operation);
CollApiCmds.CollectionApiCommand command =
commandMapper.getActionCommand(action);
if (command != null) {
- AdminCmdContext adminCmdContext = new AdminCmdContext(action,
message.getStr(ASYNC));
- adminCmdContext.withClusterState(cloudManager.getClusterState());
+ AdminCmdContext adminCmdContext =
+ new AdminCmdContext(action, message.getStr(ASYNC))
+ .withLockId(lock.id())
+ .withCallingLockId(message.getStr(CALLING_LOCK_ID_HEADER))
+ .withClusterState(cloudManager.getClusterState());
command.call(adminCmdContext, message, results);
} else {
throw new SolrException(ErrorCode.BAD_REQUEST, "Unknown operation:" +
operation);
@@ -182,7 +186,7 @@ public class OverseerCollectionMessageHandler implements
OverseerMessageHandler,
* because it happens that a lock got released).
*/
@Override
- public Lock lockTask(ZkNodeProps message, long batchSessionId) {
+ public Lock lockTask(ZkNodeProps message, long batchSessionId, String
callingLockId) {
if (sessionId != batchSessionId) {
// this is always called in the same thread.
// Each batch is supposed to have a new taskBatch
@@ -196,7 +200,8 @@ public class OverseerCollectionMessageHandler implements
OverseerMessageHandler,
Arrays.asList(
getTaskKey(message),
message.getStr(ZkStateReader.SHARD_ID_PROP),
- message.getStr(ZkStateReader.REPLICA_PROP)));
+ message.getStr(ZkStateReader.REPLICA_PROP)),
+ callingLockId);
}
@Override
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
index 0fa337f6362..be9f394d26c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java
@@ -34,6 +34,7 @@ import static
org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
+import static
org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER;
import static org.apache.solr.common.params.CollectionAdminParams.COLLECTION;
import static
org.apache.solr.common.params.CollectionAdminParams.CREATE_NODE_SET_PARAM;
import static
org.apache.solr.common.params.CollectionAdminParams.FOLLOW_ALIASES;
@@ -162,6 +163,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.SimpleOrderedMap;
+import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.CloudConfig;
import org.apache.solr.core.CoreContainer;
@@ -319,7 +321,7 @@ public class CollectionsHandler extends RequestHandlerBase
implements Permission
}
AdminCmdContext adminCmdContext =
- new AdminCmdContext(operation.action, req.getParams().get(ASYNC));
+ new AdminCmdContext(operation.action, req.getParams().get(ASYNC), req);
ZkNodeProps zkProps = new ZkNodeProps(props);
final SolrResponse overseerResponse;
@@ -366,6 +368,9 @@ public class CollectionsHandler extends RequestHandlerBase
implements Permission
if (adminCmdContext.getAsyncId() != null &&
!adminCmdContext.getAsyncId().isBlank()) {
additionalProps.put(ASYNC, adminCmdContext.getAsyncId());
}
+ if (StrUtils.isNotBlank(adminCmdContext.getCallingLockId())) {
+ additionalProps.put(CALLING_LOCK_ID_HEADER,
adminCmdContext.getCallingLockId());
+ }
m = m.plus(additionalProps);
if (adminCmdContext.getAsyncId() != null) {
String asyncId = adminCmdContext.getAsyncId();
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
index 26afe771d5f..bff0be76ad3 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/RebalanceLeaders.java
@@ -458,9 +458,9 @@ class RebalanceLeaders {
String asyncId = REBALANCELEADERS.toLower() + "_" + core + "_" +
Math.abs(System.nanoTime());
asyncRequests.add(asyncId);
+ // ignore response; we construct our own
collectionsHandler.submitCollectionApiCommand(
- new AdminCmdContext(REBALANCELEADERS, asyncId),
- new ZkNodeProps(propMap)); // ignore response; we construct our own
+ new AdminCmdContext(REBALANCELEADERS, asyncId, req), new
ZkNodeProps(propMap));
}
// maxWaitSecs - How long are we going to wait? Defaults to 30 seconds.
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java
b/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java
index 3d627c4cfed..faee1e61d72 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/api/AdminAPIBase.java
@@ -138,7 +138,7 @@ public abstract class AdminAPIBase extends JerseyResource {
ZkNodeProps remoteMessage)
throws Exception {
return submitRemoteMessageAndHandleException(
- response, new AdminCmdContext(action, null), remoteMessage);
+ response, new AdminCmdContext(action, null, solrQueryRequest),
remoteMessage);
}
protected SolrResponse submitRemoteMessageAndHandleAsync(
@@ -149,7 +149,7 @@ public abstract class AdminAPIBase extends JerseyResource {
throws Exception {
var remoteResponse =
submitRemoteMessageAndHandleException(
- response, new AdminCmdContext(action, asyncId), remoteMessage);
+ response, new AdminCmdContext(action, asyncId, solrQueryRequest),
remoteMessage);
if (asyncId != null) {
response.requestId = asyncId;
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
index df0f39ccea5..43165272027 100644
--- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java
@@ -188,6 +188,9 @@ public class HttpShardHandler extends ShardHandler {
params.remove(CommonParams.WT); // use default (currently javabin)
QueryRequest req = createQueryRequest(sreq, params, shard);
req.setMethod(SolrRequest.METHOD.POST);
+ if (sreq.headers != null) {
+ req.addHeaders(sreq.headers);
+ }
SolrRequestInfo requestInfo = SolrRequestInfo.getRequestInfo();
if (requestInfo != null) {
req.setUserPrincipal(requestInfo.getUserPrincipal());
diff --git a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
index cfcc58907e8..3e638f56de9 100644
--- a/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
+++ b/solr/core/src/java/org/apache/solr/servlet/HttpSolrCall.java
@@ -19,6 +19,7 @@ package org.apache.solr.servlet;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP;
+import static
org.apache.solr.common.params.CollectionAdminParams.CALLING_LOCK_ID_HEADER;
import static org.apache.solr.security.AuditEvent.EventType.COMPLETED;
import static org.apache.solr.security.AuditEvent.EventType.ERROR;
import static org.apache.solr.servlet.HttpSolrCall.Action.ADMIN;
@@ -738,6 +739,10 @@ public class HttpSolrCall {
protected void handleAdmin(SolrQueryResponse solrResp) {
SolrCore.preDecorateResponse(solrReq, solrResp);
+ String callingLockId = req.getHeader(CALLING_LOCK_ID_HEADER);
+ if (callingLockId != null && !callingLockId.isBlank()) {
+ solrReq.getContext().put(CALLING_LOCK_ID_HEADER, callingLockId);
+ }
handler.handleRequest(solrReq, solrResp);
SolrCore.postDecorateResponse(handler, solrReq, solrResp);
}
diff --git
a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 371568db869..2608a4e8635 100644
---
a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++
b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
@@ -57,6 +58,7 @@ import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.cloud.Overseer.LeaderStatus;
import org.apache.solr.cloud.OverseerTaskQueue.QueueEvent;
import org.apache.solr.cloud.api.collections.CollectionHandlingUtils;
+import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cluster.placement.PlacementPluginFactory;
import org.apache.solr.cluster.placement.plugins.SimplePlacementFactory;
import org.apache.solr.common.MapWriter;
@@ -74,6 +76,7 @@ import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.CoreAdminParams.CoreAdminAction;
import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectCache;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.common.util.TimeSource;
@@ -1431,4 +1434,109 @@ public class OverseerCollectionConfigSetProcessorTest
extends SolrTestCaseJ4 {
waitForEmptyQueue();
stopProcessor();
}
+
+ /**
+ * Verify that when lockTask throws a SolrException due to a callingLockId
from an unrelated
+ * collection, the async task is properly marked as failed rather than
silently swallowed or
+ * retried forever. This test exercises the real LockTree validation code
path.
+ */
+ @Test
+ public void testLockTaskExceptionFailsAsyncTask() throws Exception {
+ commonMocks(2, false);
+
+ String asyncId = "lock-fail-test-async-123";
+
+ // Create a real OverseerCollectionMessageHandler so we exercise the real
LockTree locking
+ OverseerCollectionMessageHandler collHandler =
+ new OverseerCollectionMessageHandler(
+ zkStateReaderMock,
+ "1234",
+ shardHandlerFactoryMock,
+ ADMIN_PATH,
+ new Stats(),
+ overseerMock,
+ new OverseerNodePrioritizer(
+ zkStateReaderMock, overseerMock, ADMIN_PATH,
shardHandlerFactoryMock));
+
+ // Acquire a lock on collA by calling lockTask directly.
+ // This puts a real lock into the LockTree's allLocks map.
+ ZkNodeProps collAMessage =
+ new ZkNodeProps(
+ Map.of(
+ Overseer.QUEUE_OPERATION,
+ CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+ "name",
+ "collA"));
+ OverseerMessageHandler.Lock collALock = collHandler.lockTask(collAMessage,
1, null);
+ assertNotNull("Should have acquired lock on collA", collALock);
+ String collALockId = collALock.id();
+
+ // Build a selector that always returns our real handler
+ OverseerTaskProcessor.OverseerMessageHandlerSelector selector =
+ new OverseerTaskProcessor.OverseerMessageHandlerSelector() {
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(collHandler);
+ }
+
+ @Override
+ public OverseerMessageHandler
selectOverseerMessageHandler(ZkNodeProps message) {
+ return collHandler;
+ }
+ };
+
+ // Create a processor using the real handler
+ OverseerTaskProcessor processor =
+ new OverseerTaskProcessor(
+ zkStateReaderMock,
+ "1234",
+ new Stats(),
+ selector,
+ mock(OverseerNodePrioritizer.class),
+ workQueueMock,
+ runningMapMock,
+ completedMapMock,
+ failureMapMock,
+ solrMetricsContextMock) {
+ @Override
+ protected LeaderStatus amILeader() {
+ return LeaderStatus.YES;
+ }
+ };
+
+ Thread processorThread = new Thread(processor);
+ processorThread.start();
+
+ try {
+ // Submit an async task for collB, but with collA's lock ID as the
callingLockId.
+ // The real LockTree will find collA's lock, call validateSubpath, and
throw SolrException
+ // because collB != collA.
+ Map<String, Object> propMap =
+ Map.of(
+ Overseer.QUEUE_OPERATION,
+ CollectionParams.CollectionAction.MOCK_COLL_TASK.toLower(),
+ "name",
+ "collB",
+ CollectionAdminParams.CALLING_LOCK_ID_HEADER,
+ collALockId,
+ "async",
+ asyncId);
+ ZkNodeProps props = new ZkNodeProps(propMap);
+ QueueEvent qe = new QueueEvent("lockFailTask", Utils.toJSON(props),
null);
+ queue.add(qe);
+
+ waitForEmptyQueue();
+
+ // Verify the task was put in the failure map
+ verify(failureMapMock, times(1)).put(eq(asyncId), any(byte[].class));
+
+ // Verify the task was NOT put in the running map (it should fail before
reaching that point)
+ verify(runningMapMock, times(0)).put(eq(asyncId), any());
+ } finally {
+ collALock.unlock();
+ processor.close();
+ processorThread.interrupt();
+ processorThread.join(MAX_WAIT_MS);
+ }
+ }
}
diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
index 2604feadd65..f56da34f780 100644
--- a/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
+++ b/solr/core/src/test/org/apache/solr/cloud/TestLockTree.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.OverseerMessageHandler.Lock;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.util.Pair;
import org.slf4j.Logger;
@@ -41,23 +42,26 @@ public class TestLockTree extends SolrTestCaseJ4 {
public void testLocks() throws Exception {
LockTree lockTree = new LockTree();
- Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE,
Arrays.asList("coll1"));
+ Lock coll1Lock =
+ lockTree.getSession().lock(CollectionAction.CREATE,
Arrays.asList("coll1"), null);
assertNotNull(coll1Lock);
assertNull(
"Should not be able to lock coll1/shard1",
lockTree
.getSession()
- .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1",
"shard1")));
+ .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1",
"shard1"), null));
coll1Lock.unlock();
Lock shard1Lock =
lockTree
.getSession()
- .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1",
"shard1"));
+ .lock(CollectionAction.BALANCESHARDUNIQUE, Arrays.asList("coll1",
"shard1"), null);
assertNotNull(shard1Lock);
shard1Lock.unlock();
Lock replica1Lock =
- lockTree.getSession().lock(ADDREPLICAPROP, Arrays.asList("coll1",
"shard1", "core_node2"));
+ lockTree
+ .getSession()
+ .lock(ADDREPLICAPROP, Arrays.asList("coll1", "shard1",
"core_node2"), null);
assertNotNull(replica1Lock);
List<Pair<CollectionAction, List<String>>> operations = new ArrayList<>();
@@ -80,7 +84,7 @@ public class TestLockTree extends SolrTestCaseJ4 {
List<Lock> locks = new CopyOnWriteArrayList<>();
List<Thread> threads = new ArrayList<>();
for (Pair<CollectionAction, List<String>> operation : operations) {
- final Lock lock = session.lock(operation.first(), operation.second());
+ final Lock lock = session.lock(operation.first(), operation.second(),
null);
if (lock != null) {
Thread thread = new Thread(getRunnable(completedOps, operation,
locks, lock));
threads.add(thread);
@@ -106,6 +110,123 @@ public class TestLockTree extends SolrTestCaseJ4 {
}
}
+ public void testCallingLockIdSubLocks() throws Exception {
+ LockTree lockTree = new LockTree();
+ Lock coll1Lock = lockTree.getSession().lock(CollectionAction.CREATE,
List.of("coll1"), null);
+ assertNotNull(coll1Lock);
+
+ // Test sub-locks at the same level
+ assertNull(
+ "Should not be able to lock coll1 without using a callingLockId",
+ lockTree.getSession().lock(CollectionAction.RELOAD, List.of("coll1"),
null));
+ Lock coll1Lock2 =
+ lockTree.getSession().lock(CollectionAction.RELOAD, List.of("coll1"),
coll1Lock.id());
+ assertNotNull(coll1Lock2);
+ coll1Lock2.unlock();
+
+ // Test locks underneath
+ Lock shard1Lock =
+ lockTree
+ .getSession()
+ .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard1"),
coll1Lock.id());
+ assertNotNull(shard1Lock);
+ assertNull(
+ "Should not be able to lock coll1/shard1 since our callingLockId is
only coll1, not shard1",
+ lockTree
+ .getSession()
+ .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard1"),
coll1Lock.id()));
+ Lock shard2Lock =
+ lockTree
+ .getSession()
+ .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard2"),
coll1Lock.id());
+ assertNotNull(shard2Lock);
+ shard2Lock.unlock();
+ shard1Lock.unlock();
+
+ // Test locks 2 underneath
+ Lock replica1Lock =
+ lockTree
+ .getSession()
+ .lock(MOCK_REPLICA_TASK, List.of("coll1", "shard1", "replica1"),
coll1Lock.id());
+ assertNull(
+ "Should not be able to lock coll1/shard1/replica1 since our
callingLockId is only coll1, not replica1, which is already locked",
+ lockTree
+ .getSession()
+ .lock(
+ CollectionAction.MOCK_REPLICA_TASK,
+ List.of("coll1", "shard1", "replica1"),
+ coll1Lock.id()));
+ assertNull(
+ "Should not be able to lock coll1/shard1 since our callingLockId is
only coll1, not shard1, which is locked because of a replica task",
+ lockTree
+ .getSession()
+ .lock(CollectionAction.ADDREPLICA, List.of("coll1", "shard1"),
coll1Lock.id()));
+ assertNotNull(replica1Lock);
+ Lock replica2Lock =
+ lockTree
+ .getSession()
+ .lock(MOCK_REPLICA_TASK, List.of("coll1", "shard1", "replica2"),
coll1Lock.id());
+ assertNotNull(replica2Lock);
+ replica2Lock.unlock();
+ replica1Lock.unlock();
+ coll1Lock.unlock();
+
+ // Test difference at a higher level
+ Lock shard1Lock1 =
+ lockTree
+ .getSession()
+ .lock(CollectionAction.INSTALLSHARDDATA, List.of("coll1",
"shard1"), null);
+ assertNotNull(shard1Lock1);
+ Lock shard1Lock2 =
+ lockTree
+ .getSession()
+ .lock(CollectionAction.INSTALLSHARDDATA, List.of("coll2",
"shard1"), null);
+ assertNotNull(shard1Lock2);
+ assertThrows(
+ "Should not be able to lock coll1/shard1 since our callingLockId is
coll2",
+ SolrException.class,
+ () ->
+ lockTree
+ .getSession()
+ .lock(CollectionAction.SYNCSHARD, List.of("coll1", "shard1"),
shard1Lock2.id()));
+ Lock shard1Lock3 =
+ lockTree
+ .getSession()
+ .lock(CollectionAction.SYNCSHARD, List.of("coll1", "shard1"),
shard1Lock1.id());
+ assertNotNull(shard1Lock3);
+ shard1Lock2.unlock();
+ shard1Lock3.unlock();
+
+ // Test difference at a higher level
+ assertNull(
+ "Should not be able to lock coll1 since we have no callingLockId and
shard1 is already locked. Cannot move up",
+ lockTree.getSession().lock(CollectionAction.RELOAD, List.of("coll1"),
null));
+ assertThrows(
+ "Should not be able to lock coll1 since our callingLockId is
coll1/shard1. Cannot move up",
+ SolrException.class,
+ () ->
+ lockTree
+ .getSession()
+ .lock(CollectionAction.RELOAD, List.of("coll1"),
shard1Lock1.id()));
+
+ // Test an unrelated lock
+ assertThrows(
+ "Should not be able to lock coll2 since our callingLockId is
coll1/shard1. Cannot lock an unrelated resource",
+ SolrException.class,
+ () ->
+ lockTree
+ .getSession()
+ .lock(CollectionAction.CREATE, List.of("coll2"),
shard1Lock1.id()));
+ assertThrows(
+ "Should not be able to lock coll1/shard2 since our callingLockId is
coll1/shard1. Cannot lock an unrelated resource",
+ SolrException.class,
+ () ->
+ lockTree
+ .getSession()
+ .lock(CollectionAction.SYNCSHARD, List.of("coll1", "shard2"),
shard1Lock1.id()));
+ shard1Lock1.unlock();
+ }
+
private Runnable getRunnable(
List<Pair<CollectionAction, List<String>>> completedOps,
Pair<CollectionAction, List<String>> operation,
diff --git
a/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java
b/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java
index 2c767b91fef..e31f042bd09 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ZkDistributedLockTest.java
@@ -75,17 +75,17 @@ public class ZkDistributedLockTest extends SolrTestCaseJ4 {
// Collection level locks
DistributedLock collRL1 =
factory.createLock(
- false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null);
+ false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null, null);
assertTrue("collRL1 should have been acquired", collRL1.isAcquired());
DistributedLock collRL2 =
factory.createLock(
- false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null);
+ false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null, null);
assertTrue("collRL1 should have been acquired", collRL2.isAcquired());
DistributedLock collWL3 =
factory.createLock(
- true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null);
+ true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null, null);
assertFalse(
"collWL3 should not have been acquired, due to collRL1 and collRL2",
collWL3.isAcquired());
@@ -100,7 +100,7 @@ public class ZkDistributedLockTest extends SolrTestCaseJ4 {
DistributedLock collRL4 =
factory.createLock(
- false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null);
+ false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null, null);
assertFalse(
"collRL4 should not have been acquired, due to collWL3 locking the
collection",
collRL4.isAcquired());
@@ -110,14 +110,14 @@ public class ZkDistributedLockTest extends SolrTestCaseJ4
{
// should see no impact.
DistributedLock shardWL5 =
factory.createLock(
- true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME,
SHARD_NAME, null);
+ true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME,
SHARD_NAME, null, null);
assertTrue(
"shardWL5 should have been acquired, there is no lock on that shard",
shardWL5.isAcquired());
DistributedLock shardWL6 =
factory.createLock(
- true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME,
SHARD_NAME, null);
+ true, CollectionParams.LockLevel.SHARD, COLLECTION_NAME,
SHARD_NAME, null, null);
assertFalse(
"shardWL6 should not have been acquired, shardWL5 is locking that
shard",
shardWL6.isAcquired());
@@ -125,12 +125,22 @@ public class ZkDistributedLockTest extends SolrTestCaseJ4
{
// Get a lock on a Replica. Again this is independent of collection or
shard level
DistributedLock replicaRL7 =
factory.createLock(
- false, CollectionParams.LockLevel.REPLICA, COLLECTION_NAME,
SHARD_NAME, REPLICA_NAME);
+ false,
+ CollectionParams.LockLevel.REPLICA,
+ COLLECTION_NAME,
+ SHARD_NAME,
+ REPLICA_NAME,
+ null);
assertTrue("replicaRL7 should have been acquired",
replicaRL7.isAcquired());
DistributedLock replicaWL8 =
factory.createLock(
- true, CollectionParams.LockLevel.REPLICA, COLLECTION_NAME,
SHARD_NAME, REPLICA_NAME);
+ true,
+ CollectionParams.LockLevel.REPLICA,
+ COLLECTION_NAME,
+ SHARD_NAME,
+ REPLICA_NAME,
+ null);
assertFalse(
"replicaWL8 should not have been acquired, replicaRL7 is read locking
that replica",
replicaWL8.isAcquired());
@@ -164,13 +174,13 @@ public class ZkDistributedLockTest extends SolrTestCaseJ4
{
// Acquiring right away a read lock
DistributedLock readLock =
factory.createLock(
- false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null);
+ false, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null, null);
assertTrue("readLock should have been acquired", readLock.isAcquired());
// And now creating a write lock, that can't be acquired just yet, because
of the read lock
DistributedLock writeLock =
factory.createLock(
- true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null);
+ true, CollectionParams.LockLevel.COLLECTION, COLLECTION_NAME,
null, null, null);
assertFalse("writeLock should not have been acquired",
writeLock.isAcquired());
// Wait for acquisition of the write lock on another thread (and be
notified via a latch)
diff --git
a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java
b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java
index 7f6f7e7daf4..ee0da8853b8 100644
---
a/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java
+++
b/solr/core/src/test/org/apache/solr/cloud/api/collections/CollectionApiLockingTest.java
@@ -17,12 +17,14 @@
package org.apache.solr.cloud.api.collections;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.DistributedMultiLock;
import org.apache.solr.cloud.ZkDistributedCollectionLockFactory;
import org.apache.solr.cloud.ZkTestServer;
+import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.params.CollectionParams;
import org.junit.Test;
@@ -54,6 +56,7 @@ public class CollectionApiLockingTest extends SolrTestCaseJ4 {
monothreadedTests(apiLockFactory);
multithreadedTests(apiLockFactory);
+ testCallingLockIdSubLocks(apiLockFactory);
}
} finally {
server.shutdown();
@@ -229,4 +232,201 @@ public class CollectionApiLockingTest extends
SolrTestCaseJ4 {
assertEquals(
"we should have been notified that replica lock was acquired", 0,
latch.getCount());
}
+
+ public void testCallingLockIdSubLocks(CollectionApiLockFactory
apiLockingHelper)
+ throws Exception {
+ DistributedMultiLock coll1Lock =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.CREATE),
"coll1", null, null);
+ assertEquals("Wrong number of internalLocks", 1,
coll1Lock.getCountInternalLocks());
+ assertTrue("Lock should be acquired", coll1Lock.isAcquired());
+
+ // Test sub-locks at the same level
+ DistributedMultiLock coll1Lock2 =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.RELOAD),
"coll1", null, null);
+ assertEquals("Wrong number of internalLocks", 1,
coll1Lock2.getCountInternalLocks());
+ assertFalse(
+ "Should not be able to lock coll1 without using a callingLockId",
coll1Lock2.isAcquired());
+ coll1Lock2.release();
+ coll1Lock2 =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.RELOAD)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ null,
+ null);
+ assertEquals("Wrong number of internalLocks", 1,
coll1Lock2.getCountInternalLocks());
+ assertTrue("Should be able to lock coll1 when using a callingLockId",
coll1Lock2.isAcquired());
+ coll1Lock2.release();
+
+ // Test locks underneath
+ DistributedMultiLock shard1Lock =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ "shard1",
+ null);
+ assertEquals("Wrong number of internalLocks", 2,
shard1Lock.getCountInternalLocks());
+ assertTrue(
+ "Should be able to lock coll1/shard1 when using a callingLockId on
coll1",
+ shard1Lock.isAcquired());
+ DistributedMultiLock shard1Lock2 =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ "shard1",
+ null);
+ assertEquals("Wrong number of internalLocks", 2,
shard1Lock2.getCountInternalLocks());
+ assertFalse(
+ "Should not be able to lock coll1/shard1 since our callingLockId is
only coll1, not shard1, since shard1 is already locked",
+ shard1Lock2.isAcquired());
+ shard1Lock2.release();
+ DistributedMultiLock shard2Lock =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ "shard2",
+ null);
+ assertEquals("Wrong number of internalLocks", 2,
shard2Lock.getCountInternalLocks());
+ assertTrue(
+ "Should be able to lock coll1/shard2 when using a callingLockId on
coll1, since shard2 has not been locked yet",
+ shard2Lock.isAcquired());
+ shard2Lock.release();
+ shard1Lock.release();
+
+ // Test locks 2 underneath
+ DistributedMultiLock replica1Lock =
+ apiLockingHelper.createCollectionApiLock(
+ new
AdminCmdContext(CollectionParams.CollectionAction.MOCK_REPLICA_TASK)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ "shard1",
+ "replica1");
+ assertEquals("Wrong number of internalLocks", 3,
replica1Lock.getCountInternalLocks());
+ assertTrue(
+ "Should be able to lock shard1/replica1 when using a callingLockId on
coll1",
+ replica1Lock.isAcquired());
+ DistributedMultiLock replica2Lock =
+ apiLockingHelper.createCollectionApiLock(
+ new
AdminCmdContext(CollectionParams.CollectionAction.MOCK_REPLICA_TASK)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ "shard1",
+ "replica1");
+ assertEquals("Wrong number of internalLocks", 3,
replica2Lock.getCountInternalLocks());
+ assertFalse(
+ "Should not be able to lock coll1/shard1/replica1 since our
callingLockId is only coll1, not replica1, which is already locked",
+ replica2Lock.isAcquired());
+ replica2Lock.release();
+ shard1Lock =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.ADDREPLICA)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ "shard1",
+ null);
+ assertEquals("Wrong number of internalLocks", 2,
shard1Lock.getCountInternalLocks());
+ assertFalse(
+ "Should not be able to lock coll1/shard1 since our callingLockId is
only coll1, not shard1, which is locked because of a replica task",
+ shard1Lock.isAcquired());
+ shard1Lock.release();
+ replica2Lock =
+ apiLockingHelper.createCollectionApiLock(
+ new
AdminCmdContext(CollectionParams.CollectionAction.MOCK_REPLICA_TASK)
+ .withCallingLockId(coll1Lock.getLockId()),
+ "coll1",
+ "shard1",
+ "replica2");
+ assertEquals("Wrong number of internalLocks", 3,
replica2Lock.getCountInternalLocks());
+ assertTrue(
+ "Should be able to lock shard2/replica2 when using a callingLockId on
coll1, since shard2 has not been locked yet",
+ replica2Lock.isAcquired());
+ replica2Lock.release();
+ replica1Lock.release();
+ coll1Lock.release();
+
+ // Test difference at a higher level
+ DistributedMultiLock shard1Lock1 =
+ apiLockingHelper.createCollectionApiLock(
+ new
AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA),
+ "coll1",
+ "shard1",
+ null);
+ assertEquals("Wrong number of internalLocks", 2,
shard1Lock1.getCountInternalLocks());
+ assertTrue(
+ "Should be able to lock coll1/shard1 when not using a callingLockId
since shard1 has not been locked yet",
+ shard1Lock1.isAcquired());
+ shard1Lock2 =
+ apiLockingHelper.createCollectionApiLock(
+ new
AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA),
+ "coll2",
+ "shard1",
+ null);
+ assertEquals("Wrong number of internalLocks", 2,
shard1Lock2.getCountInternalLocks());
+ assertTrue(
+ "Should be able to lock coll2/shard1 when not using a callingLockId
since shard1 has not been locked yet for coll2",
+ shard1Lock2.isAcquired());
+ String badLockId = shard1Lock2.getLockId();
+ assertThrows(
+ "Should not be able to lock coll1/shard1 since our callingLockId is
coll2",
+ SolrException.class,
+ () ->
+ apiLockingHelper.createCollectionApiLock(
+ new
AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA)
+ .withCallingLockId(badLockId),
+ "coll1",
+ "shard1",
+ null));
+ DistributedMultiLock shard1Lock3 =
+ apiLockingHelper.createCollectionApiLock(
+ new
AdminCmdContext(CollectionParams.CollectionAction.INSTALLSHARDDATA)
+ .withCallingLockId(shard1Lock1.getLockId()),
+ "coll1",
+ "shard1",
+ null);
+ assertEquals("Wrong number of internalLocks", 2,
shard1Lock3.getCountInternalLocks());
+ assertTrue(
+ "Should be able to lock coll1/shard1 since our callingLockId is
coll1/shard1",
+ shard1Lock3.isAcquired());
+ shard1Lock2.release();
+ shard1Lock3.release();
+
+ // Test difference at a higher level
+ coll1Lock2 =
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.RELOAD),
"coll1", null, null);
+ assertEquals("Wrong number of internalLocks", 1,
coll1Lock2.getCountInternalLocks());
+ assertFalse(
+ "Should not be able to lock coll1 since we have no callingLockId and
shard1 is already locked. Cannot move up",
+ coll1Lock2.isAcquired());
+ coll1Lock2.release();
+ assertExceptionThrownWithMessageContaining(
+ SolrException.class,
+ List.of("Cannot mirror lock"),
+ () ->
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.RELOAD)
+ .withCallingLockId(shard1Lock1.getLockId()),
+ "coll1",
+ null,
+ null));
+ shard1Lock3.release();
+
+ // Test an unrelated lock
+ assertThrows(
+ "Should not be able to lock coll2even since callingLockId is coll1 and
unrelated",
+ SolrException.class,
+ () ->
+ apiLockingHelper.createCollectionApiLock(
+ new AdminCmdContext(CollectionParams.CollectionAction.CREATE)
+ .withCallingLockId(shard1Lock1.getLockId()),
+ "coll2",
+ null,
+ null));
+ shard1Lock1.release();
+ }
}
diff --git
a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
index 74a8e5dd869..050e32bb977 100644
---
a/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
+++
b/solr/solrj/src/java/org/apache/solr/common/params/CollectionAdminParams.java
@@ -124,4 +124,6 @@ public interface CollectionAdminParams {
String PROPERTY_PREFIX = "property.";
String PER_REPLICA_STATE = CollectionStateProps.PER_REPLICA_STATE;
+
+ String CALLING_LOCK_ID_HEADER = "callingLockId";
}
diff --git
a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
index 6ae82508df4..a066ff1e89b 100644
--- a/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
+++ b/solr/solrj/src/java/org/apache/solr/common/params/CollectionParams.java
@@ -69,6 +69,10 @@ public interface CollectionParams {
public boolean isHigherOrEqual(LockLevel that) {
return height >= that.height;
}
+
+ public boolean isEqual(LockLevel that) {
+ return height == that.height;
+ }
}
/**
@@ -136,7 +140,7 @@ public interface CollectionParams {
// TODO when we have a node level lock use it here
BALANCE_REPLICAS(true, LockLevel.NONE),
DELETENODE(true, LockLevel.NONE),
- MOCK_REPLICA_TASK(false, LockLevel.REPLICA),
+ MOCK_REPLICA_TASK(true, LockLevel.REPLICA),
NONE(false, LockLevel.NONE),
// TODO: not implemented yet
MERGESHARDS(true, LockLevel.SHARD),