This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a25ad9  IGNITE-13683 Support of MVCC-enabled caches added to index 
validation feature. - Fixes #8432.
2a25ad9 is described below

commit 2a25ad92b1d9dc11f77834766f654002803938ba
Author: Semyon Danilov <samvi...@yandex.ru>
AuthorDate: Tue Nov 10 14:32:30 2020 +0300

    IGNITE-13683 Support of MVCC-enabled caches added to index validation 
feature. - Fixes #8432.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../internal/processors/query/h2/H2Utils.java      |   2 +-
 .../visor/verify/ValidateIndexesClosure.java       | 265 ++++++++++++++-------
 .../database/RebuildIndexWithMVCCTest.java         |  39 +++
 .../testsuites/IgnitePdsWithIndexingTestSuite.java |   2 +
 4 files changed, 215 insertions(+), 93 deletions(-)

diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
index fcd27ce..233b272 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java
@@ -114,7 +114,7 @@ import static 
org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_N
  */
 public class H2Utils {
     /** Query context H2 variable name. */
-    static final String QCTX_VARIABLE_NAME = "_IGNITE_QUERY_CONTEXT";
+    public static final String QCTX_VARIABLE_NAME = "_IGNITE_QUERY_CONTEXT";
 
     /**
      * The default precision for a char/varchar value.
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
index 1394687..d010702 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java
@@ -48,6 +48,9 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
@@ -58,11 +61,14 @@ import 
org.apache.ignite.internal.processors.cache.verify.PartitionKey;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
+import org.apache.ignite.internal.processors.query.h2.ConnectionManager;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow;
+import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
@@ -76,6 +82,7 @@ import org.apache.ignite.resources.LoggerResource;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.Index;
+import org.h2.jdbc.JdbcConnection;
 import org.h2.message.DbException;
 import org.jetbrains.annotations.Nullable;
 
@@ -519,141 +526,215 @@ public class ValidateIndexesClosure implements 
IgniteCallable<VisorValidateIndex
 
             PartitionUpdateCounter updateCntrBefore = updCntr == null ? null : 
updCntr.copy();
 
-            GridIterator<CacheDataRow> it = 
grpCtx.offheap().partitionIterator(part.id());
-
             partRes = new ValidateIndexesPartitionResult();
 
-            boolean enoughIssues = false;
+            boolean hasMvcc = 
grpCtx.caches().stream().anyMatch(GridCacheContext::mvccEnabled);
 
-            GridQueryProcessor qryProcessor = ignite.context().query();
+            if (hasMvcc) {
+                for (GridCacheContext<?, ?> context : grpCtx.caches()) {
+                    try (Session session = mvccSession(context)) {
+                        MvccSnapshot mvccSnapshot = null;
 
-            final boolean skipConditions = checkFirst > 0 || checkThrough > 0;
-            final boolean bothSkipConditions = checkFirst > 0 && checkThrough 
> 0;
+                        boolean mvccEnabled = context.mvccEnabled();
 
-            long current = 0;
-            long processedNumber = 0;
+                        if (mvccEnabled)
+                            mvccSnapshot = ((QueryContext) 
session.getVariable(H2Utils.QCTX_VARIABLE_NAME).getObject()).mvccSnapshot();
 
-            while (it.hasNextX()) {
-                if (enoughIssues)
-                    break;
+                        GridIterator<CacheDataRow> iterator = 
grpCtx.offheap().cachePartitionIterator(
+                            context.cacheId(),
+                            part.id(),
+                            mvccSnapshot,
+                            null
+                        );
 
-                CacheDataRow row = it.nextX();
+                        processPartIterator(grpCtx, partRes, session, 
iterator);
+                    }
+                }
+            }
+            else
+                processPartIterator(grpCtx, partRes, null, 
grpCtx.offheap().partitionIterator(part.id()));
 
-                if (skipConditions) {
-                    if (bothSkipConditions) {
-                        if (processedNumber > checkFirst)
+            PartitionUpdateCounter updateCntrAfter = 
part.dataStore().partUpdateCounter();
+
+            if (updateCntrAfter != null && 
!updateCntrAfter.equals(updateCntrBefore)) {
+                throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" 
+ grpCtx.cacheOrGroupName() +
+                    ", grpId=" + grpCtx.groupId() + ", partId=" + part.id() + 
"] changed during index validation " +
+                    "[before=" + updateCntrBefore + ", after=" + 
updateCntrAfter + "]");
+            }
+        }
+        catch (IgniteCheckedException e) {
+            error(log, "Failed to process partition [grpId=" + 
grpCtx.groupId() +
+                ", partId=" + part.id() + "]", e);
+
+            return emptyMap();
+        }
+        finally {
+            part.release();
+
+            printProgressOfIndexValidationIfNeeded();
+        }
+
+        PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), 
grpCtx.cacheOrGroupName());
+
+        processedPartitions.incrementAndGet();
+
+        return Collections.singletonMap(partKey, partRes);
+    }
+
+    /**
+     * Process partition iterator.
+     *
+     * @param grpCtx Cache group context.
+     * @param partRes Result object.
+     * @param session H2 session.
+     * @param it Partition iterator.
+     * @throws IgniteCheckedException
+     */
+    private void processPartIterator(
+        CacheGroupContext grpCtx,
+        ValidateIndexesPartitionResult partRes,
+        Session session,
+        GridIterator<CacheDataRow> it
+    ) throws IgniteCheckedException {
+        boolean enoughIssues = false;
+
+        GridQueryProcessor qryProcessor = ignite.context().query();
+
+        final boolean skipConditions = checkFirst > 0 || checkThrough > 0;
+        final boolean bothSkipConditions = checkFirst > 0 && checkThrough > 0;
+
+        long current = 0;
+        long processedNumber = 0;
+
+        while (it.hasNextX()) {
+            if (enoughIssues)
+                break;
+
+            CacheDataRow row = it.nextX();
+
+            if (skipConditions) {
+                if (bothSkipConditions) {
+                    if (processedNumber > checkFirst)
+                        break;
+                    else if (current++ % checkThrough > 0)
+                        continue;
+                    else
+                        processedNumber++;
+                } else {
+                    if (checkFirst > 0) {
+                        if (current++ > checkFirst)
                             break;
-                        else if (current++ % checkThrough > 0)
+                    } else {
+                        if (current++ % checkThrough > 0)
                             continue;
-                        else
-                            processedNumber++;
-                    }
-                    else {
-                        if (checkFirst > 0) {
-                            if (current++ > checkFirst)
-                                break;
-                        }
-                        else {
-                            if (current++ % checkThrough > 0)
-                                continue;
-                        }
                     }
                 }
+            }
 
-                int cacheId = row.cacheId() == 0 ? grpCtx.groupId() : 
row.cacheId();
+            int cacheId = row.cacheId() == 0 ? grpCtx.groupId() : 
row.cacheId();
 
-                GridCacheContext cacheCtx = row.cacheId() == 0 ?
-                    grpCtx.singleCacheContext() : 
grpCtx.shared().cacheContext(row.cacheId());
+            GridCacheContext<?, ?> cacheCtx = row.cacheId() == 0 ?
+                grpCtx.singleCacheContext() : 
grpCtx.shared().cacheContext(row.cacheId());
 
-                if (cacheCtx == null)
-                    throw new IgniteException("Unknown cacheId of 
CacheDataRow: " + cacheId);
+            if (cacheCtx == null)
+                throw new IgniteException("Unknown cacheId of CacheDataRow: " 
+ cacheId);
 
-                if (row.link() == 0L) {
-                    String errMsg = "Invalid partition row, possibly deleted";
+            if (row.link() == 0L) {
+                String errMsg = "Invalid partition row, possibly deleted";
 
-                    log.error(errMsg);
+                log.error(errMsg);
 
-                    IndexValidationIssue is = new IndexValidationIssue(null, 
cacheCtx.name(), null,
-                            new IgniteCheckedException(errMsg));
+                IndexValidationIssue is = new IndexValidationIssue(null, 
cacheCtx.name(), null,
+                        new IgniteCheckedException(errMsg));
 
-                    enoughIssues |= partRes.reportIssue(is);
+                enoughIssues |= partRes.reportIssue(is);
 
-                    continue;
-                }
+                continue;
+            }
 
-                QueryTypeDescriptorImpl res = qryProcessor.typeByValue(
-                    cacheCtx.name(),
-                    cacheCtx.cacheObjectContext(),
-                    row.key(),
-                    row.value(),
-                    true
-                );
+            QueryTypeDescriptorImpl res = qryProcessor.typeByValue(
+                cacheCtx.name(),
+                cacheCtx.cacheObjectContext(),
+                row.key(),
+                row.value(),
+                true
+            );
 
-                if (res == null)
-                    continue; // Tolerate - (k, v) is just not indexed.
+            if (res == null)
+                continue; // Tolerate - (k, v) is just not indexed.
 
-                IgniteH2Indexing indexing = 
(IgniteH2Indexing)qryProcessor.getIndexing();
+            IgniteH2Indexing indexing = (IgniteH2Indexing) 
qryProcessor.getIndexing();
 
-                GridH2Table gridH2Tbl = 
indexing.schemaManager().dataTable(cacheCtx.name(), res.tableName());
+            GridH2Table gridH2Tbl = 
indexing.schemaManager().dataTable(cacheCtx.name(), res.tableName());
 
-                if (gridH2Tbl == null)
-                    continue; // Tolerate - (k, v) is just not indexed.
+            if (gridH2Tbl == null)
+                continue; // Tolerate - (k, v) is just not indexed.
 
-                GridH2RowDescriptor gridH2RowDesc = gridH2Tbl.rowDescriptor();
+            GridH2RowDescriptor gridH2RowDesc = gridH2Tbl.rowDescriptor();
 
-                H2CacheRow h2Row = gridH2RowDesc.createRow(row);
+            H2CacheRow h2Row = gridH2RowDesc.createRow(row);
 
-                ArrayList<Index> indexes = gridH2Tbl.getIndexes();
+            ArrayList<Index> indexes = gridH2Tbl.getIndexes();
 
-                for (Index idx : indexes) {
-                    if (!(idx instanceof H2TreeIndexBase))
-                        continue;
+            for (Index idx : indexes) {
+                if (!(idx instanceof H2TreeIndexBase))
+                    continue;
 
-                    try {
-                        Cursor cursor = idx.find((Session)null, h2Row, h2Row);
+                try {
+                    Cursor cursor = idx.find(session, h2Row, h2Row);
 
-                        if (cursor == null || !cursor.next())
-                            throw new IgniteCheckedException("Key is present 
in CacheDataTree, but can't be found in SQL index.");
-                    }
-                    catch (Throwable t) {
-                        Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
+                    if (cursor == null || !cursor.next())
+                        throw new IgniteCheckedException("Key is present in 
CacheDataTree, but can't be found in SQL index.");
+                } catch (Throwable t) {
+                    Object o = CacheObjectUtils.unwrapBinaryIfNeeded(
                             grpCtx.cacheObjectContext(), row.key(), true, 
true);
 
-                        IndexValidationIssue is = new IndexValidationIssue(
+                    IndexValidationIssue is = new IndexValidationIssue(
                             o.toString(), cacheCtx.name(), idx.getName(), t);
 
-                        log.error("Failed to lookup key: " + is.toString(), t);
+                    log.error("Failed to lookup key: " + is.toString(), t);
 
-                        enoughIssues |= partRes.reportIssue(is);
-                    }
+                    enoughIssues |= partRes.reportIssue(is);
                 }
             }
+        }
+    }
 
-            PartitionUpdateCounter updateCntrAfter = 
part.dataStore().partUpdateCounter();
+    /**
+     * Get session with MVCC snapshot and QueryContext.
+     *
+     * @param cctx Cache context.
+     * @return Session with QueryContext and MVCC snapshot.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Session mvccSession(GridCacheContext<?, ?> cctx) throws 
IgniteCheckedException {
+        Session session = null;
 
-            if (updateCntrAfter != null && 
!updateCntrAfter.equals(updateCntrBefore)) {
-                throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" 
+ grpCtx.cacheOrGroupName() +
-                    ", grpId=" + grpCtx.groupId() + ", partId=" + part.id() + 
"] changed during index validation " +
-                    "[before=" + updateCntrBefore + ", after=" + 
updateCntrAfter + "]");
-            }
-        }
-        catch (IgniteCheckedException e) {
-            error(log, "Failed to process partition [grpId=" + 
grpCtx.groupId() +
-                ", partId=" + part.id() + "]", e);
+        boolean mvccEnabled = cctx.mvccEnabled();
 
-            return emptyMap();
-        }
-        finally {
-            part.release();
+        if (mvccEnabled) {
+            ConnectionManager connMgr = ((IgniteH2Indexing) 
ignite.context().query().getIndexing()).connections();
 
-            printProgressOfIndexValidationIfNeeded();
-        }
+            JdbcConnection connection = (JdbcConnection) 
connMgr.connection().connection();
 
-        PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), 
grpCtx.cacheOrGroupName());
+            session = (Session) connection.getSession();
 
-        processedPartitions.incrementAndGet();
+            MvccQueryTracker tracker = MvccUtils.mvccTracker(cctx, true);
 
-        return Collections.singletonMap(partKey, partRes);
+            MvccSnapshot mvccSnapshot = tracker.snapshot();
+
+            final QueryContext qctx = new QueryContext(
+                0,
+                cacheName -> null,
+                null,
+                mvccSnapshot,
+                null,
+                true
+            );
+
+            session.setVariable(H2Utils.QCTX_VARIABLE_NAME, new 
H2Utils.ValueRuntimeSimpleObject<>(qctx));
+        }
+        return session;
     }
 
     /**
@@ -713,8 +794,8 @@ public class ValidateIndexesClosure implements 
IgniteCallable<VisorValidateIndex
 
         Cursor cursor = null;
 
-        try {
-            cursor = idx.find((Session)null, null, null);
+        try (Session session = mvccSession(cacheCtxWithIdx.get1())) {
+            cursor = idx.find(session, null, null);
 
             if (cursor == null)
                 throw new IgniteCheckedException("Can't iterate through index: 
" + idx);
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithMVCCTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithMVCCTest.java
new file mode 100644
index 0000000..f5c2440
--- /dev/null
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithMVCCTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.database;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ * Test index rebuild with MVCC enabled.
+ */
+public class RebuildIndexWithMVCCTest extends RebuildIndexTest {
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        final IgniteConfiguration configuration = 
super.getConfiguration(gridName);
+
+        for (CacheConfiguration<?, ?> cacheConfiguration : 
configuration.getCacheConfiguration())
+            
cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT);
+
+        return configuration;
+    }
+
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
index 1074af3..f4a0ac9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.database.IgnitePersistentStoreSchem
 import 
org.apache.ignite.internal.processors.database.IgniteTwoRegionsRebuildIndexTest;
 import org.apache.ignite.internal.processors.database.RebuildIndexTest;
 import 
org.apache.ignite.internal.processors.database.RebuildIndexWithHistoricalRebalanceTest;
+import org.apache.ignite.internal.processors.database.RebuildIndexWithMVCCTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -53,6 +54,7 @@ import org.junit.runners.Suite;
     IndexingMultithreadedLoadContinuousRestartTest.class,
     LongDestroyDurableBackgroundTaskTest.class,
     RebuildIndexTest.class,
+    RebuildIndexWithMVCCTest.class,
     IgniteClusterSnapshotWithIndexesTest.class,
     ClientReconnectWithSqlTableConfiguredTest.class,
     MultipleParallelCacheDeleteDeadlockTest.class,

Reply via email to