This is an automated email from the ASF dual-hosted git repository. vpyatkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 92466d5bfc IGNITE-18294 Multiple lock intentions support (#1414) 92466d5bfc is described below commit 92466d5bfce5f6a7efe2d4d6a744c6f67b56d48f Author: Vladislav Pyatkov <vldpyat...@gmail.com> AuthorDate: Wed Dec 7 22:34:02 2022 +0300 IGNITE-18294 Multiple lock intentions support (#1414) --- .../ignite/internal/table/ItTableScanTest.java | 7 +- .../ignite/internal/table/TxAbstractTest.java | 2 +- .../org/apache/ignite/internal/tx/LockMode.java | 2 +- .../java/org/apache/ignite/internal/tx/Waiter.java | 7 + .../ignite/internal/tx/impl/HeapLockManager.java | 236 ++++++++++++--------- .../internal/tx/AbstractLockManagerTest.java | 66 ++---- 6 files changed, 157 insertions(+), 163 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index dcdff459fb..f9fcca89bc 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -180,7 +180,6 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { } @Test - @Disabled("IGNITE-18294 Multiple lock intentions support") public void testUpsertAllDuringPureTableScan() throws Exception { pureTableScan(tx -> { TableImpl table = getOrCreateTable(); @@ -225,7 +224,6 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { } @Test - @Disabled("IGNITE-18294 Multiple lock intentions support") public void testInsertAllDuringPureTableScan() throws Exception { pureTableScan(tx -> { TableImpl table = getOrCreateTable(); @@ -307,7 +305,6 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { } @Test - @Disabled("IGNITE-18294 Multiple lock intentions support") public void testDeleteAllDuringPureTableScan() throws Exception { pureTableScan(tx -> { TableImpl table = getOrCreateTable(); @@ -316,7 +313,7 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { return internalTable.deleteAll(List.of(createKeyRow(6), createKeyRow(10)), tx) .thenApply(deletedRows -> { - assertEquals(2, deletedRows.size()); + assertEquals(0, deletedRows.size()); return -2; }); @@ -341,7 +338,7 @@ public class ItTableScanTest extends AbstractBasicIntegrationTest { } @Test - @Disabled("IGNITE-18299 Value comparison in table operations IGNITE-18294 Multiple lock intentions support") + @Disabled("IGNITE-18299 Value comparison in table operations") public void testDeleteAllExactDuringPureTableScan() throws Exception { pureTableScan(tx -> { TableImpl table = getOrCreateTable(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java index 18fcb681f5..44ad856d9d 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxAbstractTest.java @@ -441,7 +441,7 @@ public abstract class TxAbstractTest extends IgniteAbstractTest { for (Iterator<Lock> it = txManager(accounts).lockManager().locks(tx1.id()); it.hasNext(); ) { Lock lock = it.next(); - lockUpgraded = lock.lockMode() == LockMode.X; + lockUpgraded = txManager(accounts).lockManager().waiter(lock.lockKey(), tx1.id()).intendedLockMode() == LockMode.X; if (lockUpgraded) { break; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockMode.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockMode.java index 1664a35ddf..f3bd20b50e 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockMode.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockMode.java @@ -51,7 +51,7 @@ public enum LockMode { /** Lock mode reenter matrix. */ private static final boolean[][] REENTER_MATRIX = { - {true, true, true, true, true, true}, + {true, false, false, false, false, false}, {true, true, false, false, false, false}, {true, true, true, false, false, false}, {true, true, false, true, false, false}, diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Waiter.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Waiter.java index c74310080f..d3d326aa76 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Waiter.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Waiter.java @@ -43,4 +43,11 @@ public interface Waiter { * @return Lock mode. */ LockMode lockMode(); + + /** + * Gets an intended Lock mode. + * + * @return Lock mode. + */ + LockMode intendedLockMode(); } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java index 0eb6217873..16ef5fcf4d 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java @@ -174,43 +174,38 @@ public class HeapLockManager implements LockManager { WaiterImpl prev = waiters.putIfAbsent(txId, waiter); // Reenter - if (prev != null && prev.locked()) { - if (prev.lockMode.allowReenter(lockMode)) { - prev.addLock(lockMode, 1); + if (prev != null) { + if (prev.locked() && prev.lockMode().allowReenter(lockMode)) { + waiter.lock(); - return new IgniteBiTuple(CompletableFuture.completedFuture(null), lockMode); - } else { - waiter.addLocks(prev.locks); - - waiter.upgraded = true; - - lockMode = LockMode.supremum(prev.lockMode, lockMode); + prev.upgrade(waiter); - waiter.prevLockMode = prev.lockMode; + return new IgniteBiTuple(CompletableFuture.completedFuture(null), prev.lockMode()); + } else { + waiter.upgrade(prev); - waiter.lockMode = lockMode; + assert prev.lockMode() == waiter.lockMode() : + "Lock modes are incorrect [prev=" + prev.lockMode() + ", new=" + waiter.lockMode() + ']'; waiters.put(txId, waiter); // Upgrade. } } if (!isWaiterReadyToNotify(waiter, false)) { - return new IgniteBiTuple(waiter.fut, lockMode); + return new IgniteBiTuple(waiter.fut, waiter.lockMode()); } if (!waiter.locked()) { - if (prev == null) { - waiters.remove(waiter.txId()); - } else { - waiters.put(waiter.txId(), prev); // Restore old lock. - } + waiters.remove(waiter.txId()); + } else if (waiter.hasLockIntent()) { + waiter.refuseIntent(); // Restore old lock. } } // Notify outside the monitor. waiter.notifyLocked(); - return new IgniteBiTuple(waiter.fut, lockMode); + return new IgniteBiTuple(waiter.fut, waiter.lockMode()); } /** @@ -224,7 +219,7 @@ public class HeapLockManager implements LockManager { WaiterImpl tmp = entry.getValue(); LockMode mode = lockedMode(tmp); - if (mode != null && !mode.isCompatible(waiter.lockMode())) { + if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { return false; } } @@ -233,7 +228,7 @@ public class HeapLockManager implements LockManager { WaiterImpl tmp = entry.getValue(); LockMode mode = lockedMode(tmp); - if (mode != null && !mode.isCompatible(waiter.lockMode())) { + if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { if (skipFail) { return false; } else { @@ -245,14 +240,7 @@ public class HeapLockManager implements LockManager { } } - if (waiter.upgraded) { - // Upgrade lock. - waiter.upgraded = false; - waiter.prevLockMode = null; - waiter.locked = true; - } else { - waiter.lock(); - } + waiter.lock(); return true; } @@ -291,20 +279,15 @@ public class HeapLockManager implements LockManager { WaiterImpl waiter = waiters.get(txId); if (waiter != null) { - waiter.removeLock(lockMode); + assert lockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() : + "The lock mode is not locked [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']'; - LockMode modeToDowngrade = waiter.recalculateMode(); + LockMode modeFromDowngrade = waiter.recalculateMode(lockMode); - if (!waiter.locked()) { - assert waiter.lockMode() == modeToDowngrade : "The lock mode is not locked [mode=" + lockMode + ']'; - - return false; - } - - if (modeToDowngrade == null) { + if (!waiter.locked() && !waiter.hasLockIntent()) { toNotify = release(txId); - } else { - toNotify = downgrade(txId, modeToDowngrade); + } else if (modeFromDowngrade != waiter.lockMode()) { + toNotify = unlockCompatibleWaiters(); } } } @@ -350,8 +333,8 @@ public class HeapLockManager implements LockManager { for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) { WaiterImpl tmp = entry.getValue(); - if (!tmp.locked() && isWaiterReadyToNotify(tmp, true)) { - assert tmp.locked() : "This waiter in not locked for notification [waiter=" + tmp + ']'; + if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) { + assert !tmp.hasLockIntent() : "This waiter in not locked for notification [waiter=" + tmp + ']'; toNotify.add(tmp); } @@ -360,8 +343,8 @@ public class HeapLockManager implements LockManager { for (Map.Entry<UUID, WaiterImpl> entry : waiters.entrySet()) { WaiterImpl tmp = entry.getValue(); - if (!tmp.locked() && isWaiterReadyToNotify(tmp, false)) { - assert !tmp.locked() : "Only failed waiter can be notified here [waiter=" + tmp + ']'; + if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, false)) { + assert tmp.hasLockIntent() : "Only failed waiter can be notified here [waiter=" + tmp + ']'; toNotify.add(tmp); toFail.add(tmp.txId()); @@ -369,7 +352,14 @@ public class HeapLockManager implements LockManager { } for (UUID failTx : toFail) { - waiters.remove(failTx); + var w = waiters.get(failTx); + + if (w.locked()) { + w.refuseIntent(); + } else { + waiters.remove(failTx); + } + } return toNotify; @@ -386,42 +376,11 @@ public class HeapLockManager implements LockManager { if (waiter.locked()) { mode = waiter.lockMode(); - } else if (waiter.upgraded) { - mode = waiter.prevLockMode; } return mode; } - /** - * Downgrades a lock on a specific key. - * This method should be invoked synchronously. - * - * @param txId Transaction id. - * @param lockMode Lock mode. - * @return List of waiters to notify. - */ - private List<WaiterImpl> downgrade(UUID txId, LockMode lockMode) { - WaiterImpl waiter = waiters.get(txId); - - if (waiter == null || waiter.lockMode == lockMode) { - return Collections.emptyList(); - } - - assert waiter.lockMode != LockMode.S || lockMode != LockMode.IX : - "Cannot change lock [from=" + waiter.lockMode + ", to=" + lockMode + ']'; - - assert waiter.lockMode.compareTo(lockMode) > 0 : - "Held lock mode have to be more strict than mode to downgrade [from=" + waiter.lockMode + ", to=" + lockMode - + ']'; - - waiter.lockMode = lockMode; - - List<WaiterImpl> toNotify = unlockCompatibleWaiters(); - - return toNotify; - } - /** * Returns a collection of timestamps that is associated with the specified {@code key}. * @@ -451,28 +410,31 @@ public class HeapLockManager implements LockManager { */ private static class WaiterImpl implements Comparable<WaiterImpl>, Waiter { - /** Holding locks by type. */ + /** + * Holding locks by type. + * TODO: IGNITE-18350 Abandon the collection in favor of BitSet. + */ private final Map<LockMode, Integer> locks = new HashMap<>(); + /** + * Lock modes are marked as intended, but have not taken yet. + * TODO: IGNITE-18350 Abandon the collection in favor of BitSet. + */ + private final Set<LockMode> intendedLocks = new HashSet<>(); + /** Locked future. */ @IgniteToStringExclude - private final CompletableFuture<Void> fut; + private CompletableFuture<Void> fut; /** Waiter transaction id. */ private final UUID txId; - /** Upgraded lock. */ - private boolean upgraded; - - /** The previous lock mode. */ - private LockMode prevLockMode; + /** The lock mode to intend to hold. */ + private LockMode intendedLockMode; /** The lock mode. */ private LockMode lockMode; - /** The state. */ - private boolean locked = false; - /** * The filed has a value when the waiter couldn't lock a key. */ @@ -487,9 +449,10 @@ public class HeapLockManager implements LockManager { WaiterImpl(UUID txId, LockMode lockMode) { this.fut = new CompletableFuture<>(); this.txId = txId; - this.lockMode = lockMode; + this.intendedLockMode = lockMode; locks.put(lockMode, 1); + intendedLocks.add(lockMode); } /** @@ -506,47 +469,93 @@ public class HeapLockManager implements LockManager { * Removes a lock mode. * * @param lockMode Lock mode. + * @return True if the lock mode was removed, false otherwise. */ - void removeLock(LockMode lockMode) { + private boolean removeLock(LockMode lockMode) { Integer counter = locks.get(lockMode); if (counter == null || counter < 2) { locks.remove(lockMode); + + return true; } else { locks.put(lockMode, counter - 1); + + return false; } } /** - * Recalculates lock mode based of all locks which the waiter has took. + * Recalculates lock mode based of all locks which the waiter has taken. * - * @return Recalculated lock mode. + * @param modeToRemove Mode without which, the recalculation will happen. + * @return Previous lock mode. */ - LockMode recalculateMode() { - LockMode mode = null; + LockMode recalculateMode(LockMode modeToRemove) { + if (!removeLock(modeToRemove)) { + return lockMode; + } - for (LockMode heldMode : locks.keySet()) { - assert locks.get(heldMode) > 0 : "Incorrect lock counter [txId=" + txId + ", mode=" + heldMode + "]"; + return recalculate(); + } - mode = mode == null ? heldMode : LockMode.supremum(mode, heldMode); + /** + * Recalculates lock supremums. + * + * @return Previous lock mode. + */ + private LockMode recalculate() { + LockMode newIntendedLockMode = null; + LockMode newLockMode = null; + + for (LockMode mode : locks.keySet()) { + assert locks.get(mode) > 0 : "Incorrect lock counter [txId=" + txId + ", mode=" + mode + "]"; + + if (intendedLocks.contains(mode)) { + newIntendedLockMode = newIntendedLockMode == null ? mode : LockMode.supremum(newIntendedLockMode, mode); + } else { + newLockMode = newLockMode == null ? mode : LockMode.supremum(newLockMode, mode); + } } + LockMode mode = lockMode; + + lockMode = newLockMode; + intendedLockMode = newLockMode != null && newIntendedLockMode != null ? LockMode.supremum(newLockMode, newIntendedLockMode) + : newIntendedLockMode; + return mode; } /** - * Adds several locks modes to the waiter. + * Merge all locks that were held by another waiter to the current one. * - * @param locksToAdd Map with lock modes. + * @param other Other waiter. */ - void addLocks(Map<LockMode, Integer> locksToAdd) { - for (LockMode mode : locksToAdd.keySet()) { - Integer inc = locksToAdd.get(mode); + void upgrade(WaiterImpl other) { + intendedLocks.addAll(other.intendedLocks); + + other.locks.entrySet().forEach(entry -> addLock(entry.getKey(), entry.getValue())); - addLock(mode, inc); + recalculate(); + + if (other.hasLockIntent()) { + fut = other.fut; } } + /** + * Removes all locks that were intended to hold. + */ + void refuseIntent() { + for (LockMode mode : intendedLocks) { + locks.remove(mode); + } + + intendedLocks.clear(); + intendedLockMode = null; + } + /** {@inheritDoc} */ @Override public int compareTo(@NotNull WaiterImpl o) { @@ -558,7 +567,7 @@ public class HeapLockManager implements LockManager { if (ex != null) { fut.completeExceptionally(ex); } else { - assert locked; + assert lockMode != null; fut.complete(null); } @@ -567,7 +576,16 @@ public class HeapLockManager implements LockManager { /** {@inheritDoc} */ @Override public boolean locked() { - return this.locked; + return this.lockMode != null; + } + + /** + * Checks is the waiter has any intended to lock a key. + * + * @return True if the waiter has an intended lock, false otherwise. + */ + public boolean hasLockIntent() { + return this.intendedLockMode != null; } /** {@inheritDoc} */ @@ -576,9 +594,19 @@ public class HeapLockManager implements LockManager { return lockMode; } + /** {@inheritDoc} */ + @Override + public LockMode intendedLockMode() { + return intendedLockMode; + } + /** Grant a lock. */ private void lock() { - locked = true; + lockMode = intendedLockMode; + + intendedLockMode = null; + + intendedLocks.clear(); } /** diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java index 10f8a16695..b0208498f1 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractLockManagerTest.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.testframework.IgniteTestUtils; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteException; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -132,15 +131,14 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { assertFalse(fut0.isDone()); CompletableFuture<Lock> fut2 = lockManager.acquire(txId2, key, X); - assertTrue(fut2.isDone()); - assertTrue(fut2.isCompletedExceptionally()); + expectConflict(fut2); CompletableFuture<Lock> fut1 = lockManager.acquire(txId1, key, S); fut1.join(); assertFalse(fut0.isDone()); - lockManager.release(txId2, key, X); + lockManager.release(lockManager.locks(txId2).next()); fut0.thenAccept(lock -> lockManager.release(lock)); } @@ -676,50 +674,16 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { LockKey key = new LockKey("test"); - CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X); - - assertEquals(X, fut0.join().lockMode()); - - List<LockMode> lockModes = List.of(SIX, S, IS); - - LockMode lastLockMode; - - for (LockMode lockMode : lockModes) { - var lockFut = lockManager.acquire(txId0, key, lockMode); - - Waiter waiter = lockManager.waiter(fut0.join().lockKey(), txId0); - - lastLockMode = waiter.lockMode(); - - assertEquals(lastLockMode, waiter.lockMode()); - - lockManager.release(txId0, key, X); - - assertTrue(lockManager.queue(key).size() == 1); - - waiter = lockManager.waiter(fut0.join().lockKey(), txId0); - - assertEquals(lockMode, waiter.lockMode()); - - assertTrue(lockManager.queue(key).size() == 1); - - lockManager.release(lockFut.join()); - } - - fut0 = lockManager.acquire(txId0, key, X); + for (LockMode lockMode : List.of(SIX, S, IS, IX)) { + CompletableFuture<Lock> fut0 = lockManager.acquire(txId0, key, X); - assertEquals(X, fut0.join().lockMode()); + assertEquals(X, fut0.join().lockMode()); - lockModes = List.of(SIX, IX, IS); - - for (LockMode lockMode : lockModes) { var lockFut = lockManager.acquire(txId0, key, lockMode); Waiter waiter = lockManager.waiter(fut0.join().lockKey(), txId0); - lastLockMode = waiter.lockMode(); - - assertEquals(lastLockMode, waiter.lockMode()); + assertEquals(LockMode.supremum(lockMode, X), waiter.lockMode()); lockManager.release(txId0, key, X); @@ -763,7 +727,7 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { for (LockMode lockMode : LockMode.values()) { lockManager.acquire(txId, key, lockMode); - lockManager.release(txId, key, lockMode); + lockManager. release(txId, key, lockMode); } assertTrue(lockManager.locks(txId).hasNext()); @@ -838,7 +802,6 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { } @Test - @Disabled("IGNITE-18294 Multiple lock intentions support") public void testLockingOverloadAndUpgrade() { LockKey key = new LockKey("test"); @@ -864,7 +827,6 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { } @Test - @Disabled("IGNITE-18294 Multiple lock intentions support") public void testLockingOverload() { LockKey key = new LockKey("test"); @@ -893,7 +855,6 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { } @Test - @Disabled("IGNITE-18294 Multiple lock intentions support") public void testFailUpgrade() { LockKey key = new LockKey("test"); @@ -917,16 +878,17 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { lockManager.release(tx3Lock.join()); + expectConflict(tx2xLock); + assertFalse(tx1xLock.isDone()); + + lockManager.release(tx2Lock.join()); + assertTrue(tx1xLock.isDone()); - assertFalse(tx2xLock.isDone()); lockManager.release(tx1xLock.join()); - - assertThat(tx2xLock, willSucceedFast()); } @Test - @Disabled("IGNITE-18294 Multiple lock intentions support") public void testDowngradeTargetLock() { LockKey key = new LockKey("test"); @@ -943,12 +905,12 @@ public abstract class AbstractLockManagerTest extends IgniteAbstractTest { assertFalse(tx1IxLock.isDone()); - assertEquals(SIX, lockManager.locks(tx1).next().lockMode()); + assertEquals(SIX, lockManager.waiter(key, tx1).intendedLockMode()); lockManager.release(tx1, key, S); assertFalse(tx1IxLock.isDone()); - assertEquals(IX, lockManager.locks(tx1).next().lockMode()); + assertEquals(IX, lockManager.waiter(key, tx1).intendedLockMode()); lockManager.release(tx2, key, S);