This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 6cb48373558061ecfa6a5a7657c55ce9aca614c6
Author: Sergi Vladykin <sergi.vlady...@gmail.com>
AuthorDate: Sun Feb 24 16:00:37 2019 +0300

    fixing concurrent issues
---
 .../cache/persistence/tree/BPlusTree.java          | 137 ++++++++++++++++++---
 .../database/BPlusTreeReuseSelfTest.java           |   2 +
 2 files changed, 123 insertions(+), 16 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index f911280..2eba7cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -1322,6 +1322,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
         try {
             for (;;) {
+                if (g.needGetHigher(lvl))
+                    return RETRY;
+
                 g.checkLockRetry();
 
                 // Init args.
@@ -1928,6 +1931,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             Result res = RETRY;
 
             for (;;) {
+                if (x.needGetHigher(lvl))
+                    return RETRY;
+
                 if (res == RETRY)
                     x.checkLockRetry();
 
@@ -2098,6 +2104,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
         try {
             for (;;) {
+                if (r.needGetHigher(lvl))
+                    return RETRY;
+
                 r.checkLockRetry();
 
                 // Init args.
@@ -2593,7 +2602,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     /**
      * @param pageId Page ID.
      * @param page Page pointer.
-     * @param pageAddr Page address
+     * @param pageAddr Page address.
      * @param io IO.
      * @param fwdId Forward page ID.
      * @param fwdBuf Forward buffer.
@@ -2630,7 +2639,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
     /**
      * @param pageId Page ID.
      * @param page Page pointer.
-     * @param pageAddr Page address
+     * @param pageAddr Page address.
      * @param walPlc Full page WAL record policy.
      */
     private void writeUnlockAndClose(long pageId, long page, long pageAddr, 
Boolean walPlc) {
@@ -2669,6 +2678,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
         try {
             for (;;) {
+                if (p.needGetHigher(lvl))
+                    return RETRY;
+
                 p.checkLockRetry();
 
                 // Init args.
@@ -2890,11 +2902,16 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         int lockRetriesCnt = getLockRetries();
 
         /**
-         * Used when invokeAll operation updated one or more keys on the left 
side of the tree
+         * Used when {@link InvokeAll} or another batch operation updated one 
or more keys on the left side of the tree
          * and needs to get high enough to find the rest of rows on the right 
side.
          */
         boolean gettingHigh;
 
+        /**
+         * Used in batch operations like {@link InvokeAll}:
+         */
+        int gettingHighLvl;
+
         /** Used in batch operations like {@link InvokeAll}. */
         L nextRow;
 
@@ -2979,6 +2996,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             assert !gettingHigh;
             gettingHigh = true;
 
+            if (gettingHighLvl < 0) // Unfinalize high level.
+                gettingHighLvl = -gettingHighLvl;
+
             // Reinitialize state to continue working with the next row.
             lockRetriesCnt = getLockRetries();
 
@@ -2996,6 +3016,76 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         }
 
         /**
+         * Check for batch operations needed after switching to the next row.
+         *
+         * @param lvl Current level.
+         * @return {@code true} If need to get higher.
+         */
+        final boolean needGetHigher(int lvl) {
+            return gettingHigh && lvl < Math.abs(gettingHighLvl);
+        }
+
+        /**
+         * If we are not on the right edge of the page or there are no forward 
pages,
+         * then we are high enough to have valid search from here.
+         *
+         * @param idx Insertion point.
+         * @param cnt Row count.
+         * @return {@code true} If this search result is always valid and can 
be used.
+         */
+        private boolean isValidSearch(int idx, int cnt) {
+            return -idx - 1 != cnt || fwdId == 0L;
+        }
+
+        /**
+         * Marks level to get high with the next row after the current row 
will be processed.
+         *
+         * @param lvl Current level.
+         * @param io Page IO.
+         * @param pageAddr Page address.
+         * @param idx Index of the current row.
+         * @param cnt Row count.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void markHighLevelForNextRow(int lvl, BPlusIO<L> io, long 
pageAddr, int cnt)
+            throws IgniteCheckedException {
+            // If the high level is finalized we do not need to mark lower 
levels.
+            if (gettingHighLvl < 0 && lvl < -gettingHighLvl)
+                return;
+
+            // Compare the next row with the rightmost row in the page (the 
next search row must always be greater than the current one).
+            // Mark the level if the rightmost row is greater or equal to the 
next search row.
+            // For all the rightmost pages with no forwards (including the 
root page) we always will have valid search.
+            if (fwdId == 0L || compare(io, pageAddr, cnt - 1, nextRow) >= 0)
+                gettingHighLvl = lvl;
+            else {
+                // Because we unfinalize it after each switch to the next row.
+                assert gettingHighLvl >= 0: gettingHighLvl + " " + lvl;
+
+                // If the marked high level is lower then the current level, 
it means that we jumped high after
+                // processing previous rows and comparing here the new next 
row for the first time. We have already
+                // compared this new next row to the page bound and it known 
to be greater, it means that
+                // the correct high level for the new next row is even higher 
than the current level and we do not know
+                // exactly where it is. Thus, we just guess it is somewhere 
between lvl and rootLvl.
+                // We do not want to mark high levels for more than one next 
row because in presence of
+                // concurrent modifications these levels often will be wrong, 
producing pointless extra work,
+                // there also will be space overhead of storing them.
+                if (gettingHighLvl <= lvl) {
+                    assert rootLvl > lvl; // Because otherwise we must be in 
the root here and must end up marking level in prev branch.
+
+                    // Heuristic to guess level for the next row.
+                    gettingHighLvl = (lvl + rootLvl) >>> 1;
+
+                    assert gettingHighLvl < rootLvl; // We should not touch 
root to avoid contention on it.
+                }
+
+                // Finalize the previous marked level since the next row is 
out of bounds in the current subtree.
+                // We do this finalization to avoid extra work on lower 
levels, where we already know the high level.
+                gettingHighLvl = -gettingHighLvl - 1;
+            }
+        }
+
+        /**
          * @param lvl Level.
          * @param io Page IO.
          * @param pageAddr Page address.
@@ -3003,43 +3093,55 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @return Insertion point.
          * @throws IgniteCheckedException If failed.
          */
-        int findInsertionPointx(int lvl, BPlusIO<L> io, long pageAddr, int 
cnt) throws IgniteCheckedException {
-            return gettingHigh ?
-                findInsertionPointGettingHigh(lvl, io, pageAddr, cnt) :
-                findInsertionPoint(lvl, io, pageAddr, 0, cnt, row, shift);
+        final int findInsertionPointx(int lvl, BPlusIO<L> io, long pageAddr, 
int cnt)
+            throws IgniteCheckedException {
+            int idx;
+
+            if (gettingHigh) {
+                assert lvl >= Math.abs(gettingHighLvl); // We should not get 
here until we are high enough.
+
+                idx = findInsertionPointStartingFromRight(row, lvl, io, 
pageAddr, cnt);
+
+                if (isValidSearch(idx, cnt))
+                    gettingHigh = false;
+            }
+            else
+                idx = findInsertionPoint(lvl, io, pageAddr, 0, cnt, row, 
shift);
+
+            if (nextRow != null && !gettingHigh)
+                markHighLevelForNextRow(lvl, io, pageAddr, cnt);
+
+            return idx;
         }
 
         /**
-         * For compound operations like {@link InvokeAll}.
+         * Like usual {@link #findInsertionPoint} but starting with the 
rightmost row instead of the middle row.
          *
+         * @param searchRow Search row.
          * @param lvl Level.
          * @param io Page IO.
          * @param pageAddr Page address.
          * @param cnt Row count.
          * @return Insertion point.
          */
-        protected final int findInsertionPointGettingHigh(int lvl, BPlusIO<L> 
io, long pageAddr, int cnt) throws IgniteCheckedException {
+        final int findInsertionPointStartingFromRight(L searchRow, int lvl, 
BPlusIO<L> io, long pageAddr, int cnt)
+            throws IgniteCheckedException {
             int idx;
 
             if (cnt == 0)
                 idx = -1; // The page is empty, nothing to search (and need to 
get higher if there are forward pages).
             else {
                 // Try to compare with the rightmost row before doing binary 
search.
-                int cmp = compare(lvl, io, pageAddr, cnt - 1, row);
+                int cmp = compare(lvl, io, pageAddr, cnt - 1, searchRow);
 
                 if (cmp > 0) // Search row is less than the last row in the 
page, need to do binary search.
-                    idx = findInsertionPoint(lvl, io, pageAddr, 0, cnt, row, 
shift);
+                    idx = findInsertionPoint(lvl, io, pageAddr, 0, cnt, 
searchRow, shift);
                 else if (cmp == 0)
                     idx = cnt - 1; // Search row is equal to the last row in 
the page.
                 else
                     idx = -cnt - 1; // Search row is greater than the last row 
in the page.
             }
 
-            // If we are not on the right edge of the page or there are no 
forward pages,
-            // then we are high enough to have valid search from here.
-            if (-idx - 1 != cnt || fwdId == 0L)
-                gettingHigh = false;
-
             return idx;
         }
 
@@ -3054,6 +3156,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             backId = g.backId;
             shift = g.shift;
             findLast = g.findLast;
+            // Do not copy batch related fields.
         }
 
         /**
@@ -3078,6 +3181,8 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             this.rootId = rootId;
             this.rootLvl = rootLvl;
             this.rmvId = rmvId;
+            gettingHigh = false;
+            gettingHighLvl = rootLvl;
         }
 
         /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
index 417efd1..38002c2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeReuseSelfTest.java
@@ -355,6 +355,8 @@ public class BPlusTreeReuseSelfTest extends 
BPlusTreeSelfTest {
 
             tree.invokeAll(all.iterator(), null, new UpdateAllClosure());
 
+            tree.validateTree();
+
             assertEqualContents(tree, set);
         }
     }

Reply via email to