This is an automated email from the ASF dual-hosted git repository. sboikov pushed a commit to branch ignite-invokeAll in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 6cb48373558061ecfa6a5a7657c55ce9aca614c6 Author: Sergi Vladykin <sergi.vlady...@gmail.com> AuthorDate: Sun Feb 24 16:00:37 2019 +0300 fixing concurrent issues --- .../cache/persistence/tree/BPlusTree.java | 137 ++++++++++++++++++--- .../database/BPlusTreeReuseSelfTest.java | 2 + 2 files changed, 123 insertions(+), 16 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index f911280..2eba7cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -1322,6 +1322,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements try { for (;;) { + if (g.needGetHigher(lvl)) + return RETRY; + g.checkLockRetry(); // Init args. @@ -1928,6 +1931,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements Result res = RETRY; for (;;) { + if (x.needGetHigher(lvl)) + return RETRY; + if (res == RETRY) x.checkLockRetry(); @@ -2098,6 +2104,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements try { for (;;) { + if (r.needGetHigher(lvl)) + return RETRY; + r.checkLockRetry(); // Init args. @@ -2593,7 +2602,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param pageId Page ID. * @param page Page pointer. - * @param pageAddr Page address + * @param pageAddr Page address. * @param io IO. * @param fwdId Forward page ID. * @param fwdBuf Forward buffer. @@ -2630,7 +2639,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements /** * @param pageId Page ID. * @param page Page pointer. - * @param pageAddr Page address + * @param pageAddr Page address. * @param walPlc Full page WAL record policy. */ private void writeUnlockAndClose(long pageId, long page, long pageAddr, Boolean walPlc) { @@ -2669,6 +2678,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements try { for (;;) { + if (p.needGetHigher(lvl)) + return RETRY; + p.checkLockRetry(); // Init args. @@ -2890,11 +2902,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements int lockRetriesCnt = getLockRetries(); /** - * Used when invokeAll operation updated one or more keys on the left side of the tree + * Used when {@link InvokeAll} or another batch operation updated one or more keys on the left side of the tree * and needs to get high enough to find the rest of rows on the right side. */ boolean gettingHigh; + /** + * Used in batch operations like {@link InvokeAll}: + */ + int gettingHighLvl; + /** Used in batch operations like {@link InvokeAll}. */ L nextRow; @@ -2979,6 +2996,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements assert !gettingHigh; gettingHigh = true; + if (gettingHighLvl < 0) // Unfinalize high level. + gettingHighLvl = -gettingHighLvl; + // Reinitialize state to continue working with the next row. lockRetriesCnt = getLockRetries(); @@ -2996,6 +3016,76 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements } /** + * Check for batch operations needed after switching to the next row. + * + * @param lvl Current level. + * @return {@code true} If need to get higher. + */ + final boolean needGetHigher(int lvl) { + return gettingHigh && lvl < Math.abs(gettingHighLvl); + } + + /** + * If we are not on the right edge of the page or there are no forward pages, + * then we are high enough to have valid search from here. + * + * @param idx Insertion point. + * @param cnt Row count. + * @return {@code true} If this search result is always valid and can be used. + */ + private boolean isValidSearch(int idx, int cnt) { + return -idx - 1 != cnt || fwdId == 0L; + } + + /** + * Marks level to get high with the next row after the current row will be processed. + * + * @param lvl Current level. + * @param io Page IO. + * @param pageAddr Page address. + * @param idx Index of the current row. + * @param cnt Row count. + * @throws IgniteCheckedException If failed. + */ + private void markHighLevelForNextRow(int lvl, BPlusIO<L> io, long pageAddr, int cnt) + throws IgniteCheckedException { + // If the high level is finalized we do not need to mark lower levels. + if (gettingHighLvl < 0 && lvl < -gettingHighLvl) + return; + + // Compare the next row with the rightmost row in the page (the next search row must always be greater than the current one). + // Mark the level if the rightmost row is greater or equal to the next search row. + // For all the rightmost pages with no forwards (including the root page) we always will have valid search. + if (fwdId == 0L || compare(io, pageAddr, cnt - 1, nextRow) >= 0) + gettingHighLvl = lvl; + else { + // Because we unfinalize it after each switch to the next row. + assert gettingHighLvl >= 0: gettingHighLvl + " " + lvl; + + // If the marked high level is lower then the current level, it means that we jumped high after + // processing previous rows and comparing here the new next row for the first time. We have already + // compared this new next row to the page bound and it known to be greater, it means that + // the correct high level for the new next row is even higher than the current level and we do not know + // exactly where it is. Thus, we just guess it is somewhere between lvl and rootLvl. + // We do not want to mark high levels for more than one next row because in presence of + // concurrent modifications these levels often will be wrong, producing pointless extra work, + // there also will be space overhead of storing them. + if (gettingHighLvl <= lvl) { + assert rootLvl > lvl; // Because otherwise we must be in the root here and must end up marking level in prev branch. + + // Heuristic to guess level for the next row. + gettingHighLvl = (lvl + rootLvl) >>> 1; + + assert gettingHighLvl < rootLvl; // We should not touch root to avoid contention on it. + } + + // Finalize the previous marked level since the next row is out of bounds in the current subtree. + // We do this finalization to avoid extra work on lower levels, where we already know the high level. + gettingHighLvl = -gettingHighLvl - 1; + } + } + + /** * @param lvl Level. * @param io Page IO. * @param pageAddr Page address. @@ -3003,43 +3093,55 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements * @return Insertion point. * @throws IgniteCheckedException If failed. */ - int findInsertionPointx(int lvl, BPlusIO<L> io, long pageAddr, int cnt) throws IgniteCheckedException { - return gettingHigh ? - findInsertionPointGettingHigh(lvl, io, pageAddr, cnt) : - findInsertionPoint(lvl, io, pageAddr, 0, cnt, row, shift); + final int findInsertionPointx(int lvl, BPlusIO<L> io, long pageAddr, int cnt) + throws IgniteCheckedException { + int idx; + + if (gettingHigh) { + assert lvl >= Math.abs(gettingHighLvl); // We should not get here until we are high enough. + + idx = findInsertionPointStartingFromRight(row, lvl, io, pageAddr, cnt); + + if (isValidSearch(idx, cnt)) + gettingHigh = false; + } + else + idx = findInsertionPoint(lvl, io, pageAddr, 0, cnt, row, shift); + + if (nextRow != null && !gettingHigh) + markHighLevelForNextRow(lvl, io, pageAddr, cnt); + + return idx; } /** - * For compound operations like {@link InvokeAll}. + * Like usual {@link #findInsertionPoint} but starting with the rightmost row instead of the middle row. * + * @param searchRow Search row. * @param lvl Level. * @param io Page IO. * @param pageAddr Page address. * @param cnt Row count. * @return Insertion point. */ - protected final int findInsertionPointGettingHigh(int lvl, BPlusIO<L> io, long pageAddr, int cnt) throws IgniteCheckedException { + final int findInsertionPointStartingFromRight(L searchRow, int lvl, BPlusIO<L> io, long pageAddr, int cnt) + throws IgniteCheckedException { int idx; if (cnt == 0) idx = -1; // The page is empty, nothing to search (and need to get higher if there are forward pages). else { // Try to compare with the rightmost row before doing binary search. - int cmp = compare(lvl, io, pageAddr, cnt - 1, row); + int cmp = compare(lvl, io, pageAddr, cnt - 1, searchRow); if (cmp > 0) // Search row is less than the last row in the page, need to do binary search. - idx = findInsertionPoint(lvl, io, pageAddr, 0, cnt, row, shift); + idx = findInsertionPoint(lvl, io, pageAddr, 0, cnt, searchRow, shift); else if (cmp == 0) idx = cnt - 1; // Search row is equal to the last row in the page. else idx = -cnt - 1; // Search row is greater than the last row in the page. } - // If we are not on the right edge of the page or there are no forward pages, - // then we are high enough to have valid search from here. - if (-idx - 1 != cnt || fwdId == 0L) - gettingHigh = false; - return idx; } @@ -3054,6 +3156,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements backId = g.backId; shift = g.shift; findLast = g.findLast; + // Do not copy batch related fields. } /** @@ -3078,6 +3181,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements this.rootId = rootId; this.rootLvl = rootLvl; this.rmvId = rmvId; + gettingHigh = false; + gettingHighLvl = rootLvl; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java index 417efd1..38002c2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java @@ -355,6 +355,8 @@ public class BPlusTreeReuseSelfTest extends BPlusTreeSelfTest { tree.invokeAll(all.iterator(), null, new UpdateAllClosure()); + tree.validateTree(); + assertEqualContents(tree, set); } }