Till Westmann has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/694
Change subject: WORK IN PROGRESS
......................................................................
WORK IN PROGRESS
no more implicit dataset locks
Change-Id: I774dd40113f5fa4fee94dde72f4ec6a1891c6b6e
---
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
M
asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
M
asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
3 files changed, 39 insertions(+), 150 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/94/694/1
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
index e268134..c1333da 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/locking/ConcurrentLockManager.java
@@ -47,6 +47,9 @@
static final Logger LOGGER =
Logger.getLogger(ConcurrentLockManager.class.getName());
static final Level LVL = Level.FINER;
+ public static final int NIL = -1;
+ public static final long NILL = -1l;
+
public static final boolean DEBUG_MODE = false;//true
public static final boolean CHECK_CONSISTENCY = false;
@@ -55,7 +58,6 @@
private RequestArenaManager reqArenaMgr;
private JobArenaManager jobArenaMgr;
private ConcurrentHashMap<Integer, Long> jobIdSlotMap;
- private ThreadLocal<DatasetLockCache> dsLockCache;
private LockManagerStats stats = new LockManagerStats(10000);
enum LockAction {
@@ -95,12 +97,6 @@
reqArenaMgr = new RequestArenaManager(noArenas,
lockManagerShrinkTimer);
jobArenaMgr = new JobArenaManager(noArenas, lockManagerShrinkTimer);
jobIdSlotMap = new ConcurrentHashMap<>();
- dsLockCache = new ThreadLocal<DatasetLockCache>() {
- @Override
- protected DatasetLockCache initialValue() {
- return new DatasetLockCache();
- }
- };
}
@Override
@@ -111,17 +107,7 @@
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
-
- if (entityHashValue != -1) {
- lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
- } else {
- if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
- return;
- }
- }
-
final long jobSlot = findOrAllocJobSlot(jobId);
-
final ResourceGroup group = table.get(dsId, entityHashValue);
group.getLatch();
try {
@@ -159,9 +145,6 @@
default:
throw new IllegalStateException();
}
- }
- if (entityHashValue == -1) {
- dsLockCache.get().put(jobId, dsId, lockMode);
}
} catch (InterruptedException e) {
throw new WaitInterruptedException(txnContext, "interrupted", e);
@@ -336,15 +319,8 @@
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
-
- if (entityHashValue != -1) {
- lock(datasetId, -1, LockMode.intentionMode(lockMode), txnContext);
- } else {
- throw new UnsupportedOperationException("instant locks are not
supported on datasets");
- }
-
final ResourceGroup group = table.get(dsId, entityHashValue);
- if (group.firstResourceIndex.get() == -1l) {
+ if (group.firstResourceIndex.get() == NILL) {
validateJob(txnContext);
// if we do not have a resource in the group, we know that the
// resource that we are looking for is not locked
@@ -352,7 +328,7 @@
}
// we only allocate a request slot if we actually have to wait
- long reqSlot = -1;
+ long reqSlot = NILL;
group.getLatch();
try {
@@ -374,7 +350,7 @@
return;
case WAIT:
case CONV:
- if (reqSlot == -1) {
+ if (reqSlot == NILL) {
reqSlot = allocRequestSlot(resSlot, jobSlot,
lockMode);
}
enqueueWaiter(group, reqSlot, resSlot, jobSlot, act,
txnContext);
@@ -387,7 +363,7 @@
} catch (InterruptedException e) {
throw new WaitInterruptedException(txnContext, "interrupted", e);
} finally {
- if (reqSlot != -1) {
+ if (reqSlot != NILL) {
// deallocate request, if we allocated one earlier
if (DEBUG_MODE)
LOGGER.finer("del req slot " +
TypeUtil.Global.toString(reqSlot));
@@ -405,17 +381,6 @@
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
-
- if (entityHashValue != -1) {
- if (!tryLock(datasetId, -1, LockMode.intentionMode(lockMode),
txnContext)) {
- return false;
- }
- } else {
- if (dsLockCache.get().contains(jobId, dsId, lockMode)) {
- return true;
- }
- }
-
final long jobSlot = findOrAllocJobSlot(jobId);
final ResourceGroup group = table.get(dsId, entityHashValue);
@@ -434,9 +399,6 @@
// no break
case GET:
addHolder(reqSlot, resSlot, jobSlot);
- if (entityHashValue == -1) {
- dsLockCache.get().put(jobId, dsId, lockMode);
- }
return true;
case WAIT:
case CONV:
@@ -447,9 +409,6 @@
} finally {
group.releaseLatch();
}
-
- // if we did acquire the dataset lock, but not the entity lock, we keep
- // it anyway and clean it up at the end of the job
}
@Override
@@ -460,17 +419,8 @@
final int dsId = datasetId.getId();
final int jobId = txnContext.getJobId().getId();
-
- if (entityHashValue != -1) {
- if (!tryLock(datasetId, -1, LockMode.intentionMode(lockMode),
txnContext)) {
- return false;
- }
- } else {
- throw new UnsupportedOperationException("instant locks are not
supported on datasets");
- }
-
final ResourceGroup group = table.get(dsId, entityHashValue);
- if (group.firstResourceIndex.get() == -1l) {
+ if (group.firstResourceIndex.get() == NILL) {
validateJob(txnContext);
// if we do not have a resource in the group, we know that the
// resource that we are looking for is not locked
@@ -561,15 +511,11 @@
} finally {
group.releaseLatch();
}
-
- // dataset intention locks are
- // a) kept in dsLockCache and
- // b) cleaned up only in releaseLocks at the end of the job
}
@Override
public void releaseLocks(ITransactionContext txnContext) throws
ACIDException {
- log("releaseLocks", -1, -1, LockMode.ANY, txnContext);
+ log("releaseLocks", NIL, NIL, LockMode.ANY, txnContext);
stats.releaseLocks();
int jobId = txnContext.getJobId().getId();
@@ -588,7 +534,7 @@
synchronized (jobArenaMgr) {
holder = jobArenaMgr.getLastHolder(jobSlot);
}
- while (holder != -1) {
+ while (holder != NILL) {
long resource = reqArenaMgr.getResourceId(holder);
int dsId = resArenaMgr.getDatasetId(resource);
int pkHashVal = resArenaMgr.getPkHashVal(resource);
@@ -629,7 +575,7 @@
private long findOrAllocResourceSlot(ResourceGroup group, int dsId, int
entityHashValue) {
long resSlot = findResourceInGroup(group, dsId, entityHashValue);
- if (resSlot == -1) {
+ if (resSlot == NILL) {
// we don't know about this resource, let's alloc a slot
resSlot = resArenaMgr.allocate();
resArenaMgr.setDatasetId(resSlot, dsId);
@@ -690,7 +636,7 @@
// carefully distinguishing the different lock modes
long holder = resArenaMgr.getLastHolder(resource);
LockAction res = LockAction.WAIT;
- while (holder != -1) {
+ while (holder != NILL) {
if (job == reqArenaMgr.getJobSlot(holder)) {
if (reqArenaMgr.getLockMode(holder) == lockMode) {
return LockAction.GET;
@@ -706,7 +652,7 @@
private long findResourceInGroup(ResourceGroup group, int dsId, int
entityHashValue) {
stats.logCounters(LOGGER, Level.INFO, false);
long resSlot = group.firstResourceIndex.get();
- while (resSlot != -1) {
+ while (resSlot != NILL) {
// either we already have a lock on this resource or we have a
// hash collision
if (resArenaMgr.getDatasetId(resSlot) == dsId &&
resArenaMgr.getPkHashVal(resSlot) == entityHashValue) {
@@ -715,7 +661,7 @@
resSlot = resArenaMgr.getNext(resSlot);
}
}
- return -1;
+ return NILL;
}
private void addHolder(long request, long resource, long job) {
@@ -732,7 +678,7 @@
private boolean hasOtherHolders(long resSlot, long jobSlot) {
long holder = resArenaMgr.getLastHolder(resSlot);
- while (holder != -1) {
+ while (holder != NILL) {
if (reqArenaMgr.getJobSlot(holder) != jobSlot) {
return true;
}
@@ -771,10 +717,10 @@
private long removeRequestFromJob(long holder, long unmodified) {
long prevForJob = reqArenaMgr.getPrevJobRequest(holder);
long nextForJob = reqArenaMgr.getNextJobRequest(holder);
- if (nextForJob != -1) {
+ if (nextForJob != NILL) {
reqArenaMgr.setPrevJobRequest(nextForJob, prevForJob);
}
- if (prevForJob == -1) {
+ if (prevForJob == NILL) {
return nextForJob;
} else {
reqArenaMgr.setNextJobRequest(prevForJob, nextForJob);
@@ -792,8 +738,8 @@
@Override
public void add(long request, long resource, long job) {
long waiter = resArenaMgr.getFirstWaiter(resource);
- reqArenaMgr.setNextRequest(request, -1);
- if (waiter == -1) {
+ reqArenaMgr.setNextRequest(request, NILL);
+ if (waiter == NILL) {
resArenaMgr.setFirstWaiter(resource, request);
} else {
appendToRequestQueue(waiter, request);
@@ -826,8 +772,8 @@
@Override
public void add(long request, long resource, long job) {
long upgrader = resArenaMgr.getFirstUpgrader(resource);
- reqArenaMgr.setNextRequest(request, -1);
- if (upgrader == -1) {
+ reqArenaMgr.setNextRequest(request, NILL);
+ if (upgrader == NILL) {
resArenaMgr.setFirstUpgrader(resource, request);
} else {
appendToRequestQueue(upgrader, request);
@@ -858,7 +804,7 @@
private void insertIntoJobQueue(long newRequest, long oldRequest) {
reqArenaMgr.setNextJobRequest(newRequest, oldRequest);
- reqArenaMgr.setPrevJobRequest(newRequest, -1);
+ reqArenaMgr.setPrevJobRequest(newRequest, NILL);
if (oldRequest >= 0) {
reqArenaMgr.setPrevJobRequest(oldRequest, newRequest);
}
@@ -866,7 +812,7 @@
private void appendToRequestQueue(long head, long appendee) {
long next = reqArenaMgr.getNextRequest(head);
- while (next != -1) {
+ while (next != NILL) {
head = next;
next = reqArenaMgr.getNextRequest(head);
}
@@ -876,9 +822,9 @@
private long removeRequestFromQueueForSlot(long head, long reqSlot) {
long cur = head;
long prev = cur;
- while (prev != -1) {
+ while (prev != NILL) {
cur = reqArenaMgr.getNextRequest(prev);
- if (cur == -1) {
+ if (cur == NILL) {
throw new IllegalStateException("request " + reqSlot + " not
in queue");
}
if (cur == reqSlot) {
@@ -907,9 +853,9 @@
private long removeRequestFromQueueForJob(long head, long jobSlot, byte
lockMode) {
long holder = head;
long prev = holder;
- while (prev != -1) {
+ while (prev != NILL) {
holder = reqArenaMgr.getNextRequest(prev);
- if (holder == -1) {
+ if (holder == NILL) {
throw new IllegalStateException("no entry for job " + jobSlot
+ " in queue");
}
if (requestMatches(holder, jobSlot, lockMode)) {
@@ -925,7 +871,7 @@
private int determineNewMaxMode(long resource, int oldMaxMode) {
int newMaxMode = LockMode.NL;
long holder = resArenaMgr.getLastHolder(resource);
- while (holder != -1) {
+ while (holder != NILL) {
int curLockMode = reqArenaMgr.getLockMode(holder);
if (curLockMode == oldMaxMode) {
// we have another lock of the same mode - we're done
@@ -948,8 +894,8 @@
}
private boolean resourceNotUsed(long resource) {
- return resArenaMgr.getLastHolder(resource) == -1 &&
resArenaMgr.getFirstUpgrader(resource) == -1
- && resArenaMgr.getFirstWaiter(resource) == -1;
+ return resArenaMgr.getLastHolder(resource) == NILL &&
resArenaMgr.getFirstUpgrader(resource) == NILL
+ && resArenaMgr.getFirstWaiter(resource) == NILL;
}
private void validateJob(ITransactionContext txnContext) throws
ACIDException {
@@ -976,10 +922,10 @@
}
StringBuilder sb = new StringBuilder();
sb.append("{ op : ").append(string);
- if (id != -1) {
+ if (id != NIL) {
sb.append(" , dataset : ").append(id);
}
- if (entityHashValue != -1) {
+ if (entityHashValue != NIL) {
sb.append(" , entity : ").append(entityHashValue);
}
if (lockMode != LockMode.NL) {
@@ -1000,11 +946,11 @@
if (group.tryLatch(100, TimeUnit.MILLISECONDS)) {
try {
long resSlot = group.firstResourceIndex.get();
- while (resSlot != -1) {
+ while (resSlot != NILL) {
int dsId = resArenaMgr.getDatasetId(resSlot);
int entityHashValue =
resArenaMgr.getPkHashVal(resSlot);
long reqSlot = resArenaMgr.getLastHolder(resSlot);
- while (reqSlot != -1) {
+ while (reqSlot != NILL) {
byte lockMode = (byte)
reqArenaMgr.getLockMode(reqSlot);
long jobSlot = reqArenaMgr.getJobSlot(reqSlot);
int jobId = jobArenaMgr.getJobId(jobSlot);
@@ -1026,7 +972,7 @@
}
private void assertLockCanBeFoundInJobQueue(int dsId, int entityHashValue,
byte lockMode, int jobId) {
- if (findLockInJobQueue(dsId, entityHashValue, jobId, lockMode) == -1) {
+ if (findLockInJobQueue(dsId, entityHashValue, jobId, lockMode) ==
NILL) {
String msg = "request for " + LockMode.toString(lockMode) + " lock
on dataset " + dsId + " entity "
+ entityHashValue + " not found for job " + jobId + " in
thread "
+ Thread.currentThread().getName();
@@ -1046,19 +992,19 @@
* job id
* @param lockMode
* lock mode
- * @return the slot of the request, if the lock request is found, -1
otherwise
+ * @return the slot of the request, if the lock request is found, NILL
otherwise
*/
private long findLockInJobQueue(final int dsId, final int entityHashValue,
final int jobId, byte lockMode) {
Long jobSlot = jobIdSlotMap.get(jobId);
if (jobSlot == null) {
- return -1;
+ return NILL;
}
long holder;
synchronized (jobArenaMgr) {
holder = jobArenaMgr.getLastHolder(jobSlot);
}
- while (holder != -1) {
+ while (holder != NILL) {
long resource = reqArenaMgr.getResourceId(holder);
if (dsId == resArenaMgr.getDatasetId(resource) && entityHashValue
== resArenaMgr.getPkHashVal(resource)
&& jobSlot == reqArenaMgr.getJobSlot(holder)
@@ -1069,7 +1015,7 @@
holder = reqArenaMgr.getNextJobRequest(holder);
}
}
- return -1;
+ return NILL;
}
private TablePrinter getResourceTablePrinter() {
@@ -1115,46 +1061,4 @@
dumpState(os);
}
}
-
- private static class DatasetLockCache {
- private long jobId = -1;
- private HashMap<Integer, Byte> lockCache = new HashMap<Integer,
Byte>();
- // size 1 cache to avoid the boxing/unboxing that comes with the
- // access to the HashMap
- private int cDsId = -1;
- private byte cDsLockMode = -1;
-
- public boolean contains(final int jobId, final int dsId, byte
dsLockMode) {
- if (this.jobId == jobId) {
- if (this.cDsId == dsId && this.cDsLockMode == dsLockMode) {
- return true;
- }
- final Byte cachedLockMode = this.lockCache.get(dsId);
- if (cachedLockMode != null && cachedLockMode == dsLockMode) {
- this.cDsId = dsId;
- this.cDsLockMode = dsLockMode;
- return true;
- }
- } else {
- this.jobId = -1;
- this.cDsId = -1;
- this.cDsLockMode = -1;
- this.lockCache.clear();
- }
- return false;
- }
-
- public void put(final int jobId, final int dsId, byte dsLockMode) {
- this.jobId = jobId;
- this.cDsId = dsId;
- this.cDsLockMode = dsLockMode;
- this.lockCache.put(dsId, dsLockMode);
- }
-
- @Override
- public String toString() {
- return "[ " + jobId + " : " + lockCache.toString() + "]";
- }
- }
-
}
diff --git
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
index 5df230b..ac964a5 100644
---
a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
+++
b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManagementConstants.java
@@ -24,11 +24,6 @@
*/
public class TransactionManagementConstants {
- public static class ResourceMgrIds {
- public static final byte BTREE_RESOURCE_MGR_ID = 1;
- public static final byte METADATA_RESOURCE_MGR_ID = 2;
- }
-
public static class LogManagerConstants {
public static final int TERMINAL_LSN = -1;
}
@@ -41,15 +36,6 @@
public static final byte IX = 2;
public static final byte S = 3;
public static final byte X = 4;
-
- public static byte intentionMode(byte mode) {
- switch (mode) {
- case S: return IS;
- case X: return IX;
- default: throw new IllegalArgumentException(
- "no intention lock mode for " + toString(mode));
- }
- }
public static String toString(byte mode) {
switch (mode) {
diff --git
a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
index 5cabd04..8725114 100644
---
a/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
+++
b/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/LockManagerUnitTest.java
@@ -87,7 +87,6 @@
List<Request> reqs = new ArrayList<>();
reqs.add(req(Kind.LOCK, j(1), d(1), e(1), LockMode.S));
reqs.add(req(Kind.UNLOCK, j(1), d(1), e(1), LockMode.S));
- reqs.add(req(Kind.UNLOCK, j(1), d(1), e(-1), LockMode.IS));
reportErrors(execute(reqs));
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/694
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I774dd40113f5fa4fee94dde72f4ec6a1891c6b6e
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Till Westmann <[email protected]>