This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 2a25ad9 IGNITE-13683 Support of MVCC-enabled caches added to index validation feature. - Fixes #8432. 2a25ad9 is described below commit 2a25ad92b1d9dc11f77834766f654002803938ba Author: Semyon Danilov <samvi...@yandex.ru> AuthorDate: Tue Nov 10 14:32:30 2020 +0300 IGNITE-13683 Support of MVCC-enabled caches added to index validation feature. - Fixes #8432. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../internal/processors/query/h2/H2Utils.java | 2 +- .../visor/verify/ValidateIndexesClosure.java | 265 ++++++++++++++------- .../database/RebuildIndexWithMVCCTest.java | 39 +++ .../testsuites/IgnitePdsWithIndexingTestSuite.java | 2 + 4 files changed, 215 insertions(+), 93 deletions(-) diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java index fcd27ce..233b272 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Utils.java @@ -114,7 +114,7 @@ import static org.apache.ignite.internal.processors.query.QueryUtils.VAL_FIELD_N */ public class H2Utils { /** Query context H2 variable name. */ - static final String QCTX_VARIABLE_NAME = "_IGNITE_QUERY_CONTEXT"; + public static final String QCTX_VARIABLE_NAME = "_IGNITE_QUERY_CONTEXT"; /** * The default precision for a char/varchar value. diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java index 1394687..d010702 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java @@ -48,6 +48,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; @@ -58,11 +61,14 @@ import org.apache.ignite.internal.processors.cache.verify.PartitionKey; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl; +import org.apache.ignite.internal.processors.query.h2.ConnectionManager; +import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.opt.H2CacheRow; +import org.apache.ignite.internal.processors.query.h2.opt.QueryContext; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -76,6 +82,7 @@ import org.apache.ignite.resources.LoggerResource; import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.Index; +import org.h2.jdbc.JdbcConnection; import org.h2.message.DbException; import org.jetbrains.annotations.Nullable; @@ -519,141 +526,215 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex PartitionUpdateCounter updateCntrBefore = updCntr == null ? null : updCntr.copy(); - GridIterator<CacheDataRow> it = grpCtx.offheap().partitionIterator(part.id()); - partRes = new ValidateIndexesPartitionResult(); - boolean enoughIssues = false; + boolean hasMvcc = grpCtx.caches().stream().anyMatch(GridCacheContext::mvccEnabled); - GridQueryProcessor qryProcessor = ignite.context().query(); + if (hasMvcc) { + for (GridCacheContext<?, ?> context : grpCtx.caches()) { + try (Session session = mvccSession(context)) { + MvccSnapshot mvccSnapshot = null; - final boolean skipConditions = checkFirst > 0 || checkThrough > 0; - final boolean bothSkipConditions = checkFirst > 0 && checkThrough > 0; + boolean mvccEnabled = context.mvccEnabled(); - long current = 0; - long processedNumber = 0; + if (mvccEnabled) + mvccSnapshot = ((QueryContext) session.getVariable(H2Utils.QCTX_VARIABLE_NAME).getObject()).mvccSnapshot(); - while (it.hasNextX()) { - if (enoughIssues) - break; + GridIterator<CacheDataRow> iterator = grpCtx.offheap().cachePartitionIterator( + context.cacheId(), + part.id(), + mvccSnapshot, + null + ); - CacheDataRow row = it.nextX(); + processPartIterator(grpCtx, partRes, session, iterator); + } + } + } + else + processPartIterator(grpCtx, partRes, null, grpCtx.offheap().partitionIterator(part.id())); - if (skipConditions) { - if (bothSkipConditions) { - if (processedNumber > checkFirst) + PartitionUpdateCounter updateCntrAfter = part.dataStore().partUpdateCounter(); + + if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) { + throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() + + ", grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "] changed during index validation " + + "[before=" + updateCntrBefore + ", after=" + updateCntrAfter + "]"); + } + } + catch (IgniteCheckedException e) { + error(log, "Failed to process partition [grpId=" + grpCtx.groupId() + + ", partId=" + part.id() + "]", e); + + return emptyMap(); + } + finally { + part.release(); + + printProgressOfIndexValidationIfNeeded(); + } + + PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName()); + + processedPartitions.incrementAndGet(); + + return Collections.singletonMap(partKey, partRes); + } + + /** + * Process partition iterator. + * + * @param grpCtx Cache group context. + * @param partRes Result object. + * @param session H2 session. + * @param it Partition iterator. + * @throws IgniteCheckedException + */ + private void processPartIterator( + CacheGroupContext grpCtx, + ValidateIndexesPartitionResult partRes, + Session session, + GridIterator<CacheDataRow> it + ) throws IgniteCheckedException { + boolean enoughIssues = false; + + GridQueryProcessor qryProcessor = ignite.context().query(); + + final boolean skipConditions = checkFirst > 0 || checkThrough > 0; + final boolean bothSkipConditions = checkFirst > 0 && checkThrough > 0; + + long current = 0; + long processedNumber = 0; + + while (it.hasNextX()) { + if (enoughIssues) + break; + + CacheDataRow row = it.nextX(); + + if (skipConditions) { + if (bothSkipConditions) { + if (processedNumber > checkFirst) + break; + else if (current++ % checkThrough > 0) + continue; + else + processedNumber++; + } else { + if (checkFirst > 0) { + if (current++ > checkFirst) break; - else if (current++ % checkThrough > 0) + } else { + if (current++ % checkThrough > 0) continue; - else - processedNumber++; - } - else { - if (checkFirst > 0) { - if (current++ > checkFirst) - break; - } - else { - if (current++ % checkThrough > 0) - continue; - } } } + } - int cacheId = row.cacheId() == 0 ? grpCtx.groupId() : row.cacheId(); + int cacheId = row.cacheId() == 0 ? grpCtx.groupId() : row.cacheId(); - GridCacheContext cacheCtx = row.cacheId() == 0 ? - grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(row.cacheId()); + GridCacheContext<?, ?> cacheCtx = row.cacheId() == 0 ? + grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(row.cacheId()); - if (cacheCtx == null) - throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId); + if (cacheCtx == null) + throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId); - if (row.link() == 0L) { - String errMsg = "Invalid partition row, possibly deleted"; + if (row.link() == 0L) { + String errMsg = "Invalid partition row, possibly deleted"; - log.error(errMsg); + log.error(errMsg); - IndexValidationIssue is = new IndexValidationIssue(null, cacheCtx.name(), null, - new IgniteCheckedException(errMsg)); + IndexValidationIssue is = new IndexValidationIssue(null, cacheCtx.name(), null, + new IgniteCheckedException(errMsg)); - enoughIssues |= partRes.reportIssue(is); + enoughIssues |= partRes.reportIssue(is); - continue; - } + continue; + } - QueryTypeDescriptorImpl res = qryProcessor.typeByValue( - cacheCtx.name(), - cacheCtx.cacheObjectContext(), - row.key(), - row.value(), - true - ); + QueryTypeDescriptorImpl res = qryProcessor.typeByValue( + cacheCtx.name(), + cacheCtx.cacheObjectContext(), + row.key(), + row.value(), + true + ); - if (res == null) - continue; // Tolerate - (k, v) is just not indexed. + if (res == null) + continue; // Tolerate - (k, v) is just not indexed. - IgniteH2Indexing indexing = (IgniteH2Indexing)qryProcessor.getIndexing(); + IgniteH2Indexing indexing = (IgniteH2Indexing) qryProcessor.getIndexing(); - GridH2Table gridH2Tbl = indexing.schemaManager().dataTable(cacheCtx.name(), res.tableName()); + GridH2Table gridH2Tbl = indexing.schemaManager().dataTable(cacheCtx.name(), res.tableName()); - if (gridH2Tbl == null) - continue; // Tolerate - (k, v) is just not indexed. + if (gridH2Tbl == null) + continue; // Tolerate - (k, v) is just not indexed. - GridH2RowDescriptor gridH2RowDesc = gridH2Tbl.rowDescriptor(); + GridH2RowDescriptor gridH2RowDesc = gridH2Tbl.rowDescriptor(); - H2CacheRow h2Row = gridH2RowDesc.createRow(row); + H2CacheRow h2Row = gridH2RowDesc.createRow(row); - ArrayList<Index> indexes = gridH2Tbl.getIndexes(); + ArrayList<Index> indexes = gridH2Tbl.getIndexes(); - for (Index idx : indexes) { - if (!(idx instanceof H2TreeIndexBase)) - continue; + for (Index idx : indexes) { + if (!(idx instanceof H2TreeIndexBase)) + continue; - try { - Cursor cursor = idx.find((Session)null, h2Row, h2Row); + try { + Cursor cursor = idx.find(session, h2Row, h2Row); - if (cursor == null || !cursor.next()) - throw new IgniteCheckedException("Key is present in CacheDataTree, but can't be found in SQL index."); - } - catch (Throwable t) { - Object o = CacheObjectUtils.unwrapBinaryIfNeeded( + if (cursor == null || !cursor.next()) + throw new IgniteCheckedException("Key is present in CacheDataTree, but can't be found in SQL index."); + } catch (Throwable t) { + Object o = CacheObjectUtils.unwrapBinaryIfNeeded( grpCtx.cacheObjectContext(), row.key(), true, true); - IndexValidationIssue is = new IndexValidationIssue( + IndexValidationIssue is = new IndexValidationIssue( o.toString(), cacheCtx.name(), idx.getName(), t); - log.error("Failed to lookup key: " + is.toString(), t); + log.error("Failed to lookup key: " + is.toString(), t); - enoughIssues |= partRes.reportIssue(is); - } + enoughIssues |= partRes.reportIssue(is); } } + } + } - PartitionUpdateCounter updateCntrAfter = part.dataStore().partUpdateCounter(); + /** + * Get session with MVCC snapshot and QueryContext. + * + * @param cctx Cache context. + * @return Session with QueryContext and MVCC snapshot. + * @throws IgniteCheckedException If failed. + */ + private Session mvccSession(GridCacheContext<?, ?> cctx) throws IgniteCheckedException { + Session session = null; - if (updateCntrAfter != null && !updateCntrAfter.equals(updateCntrBefore)) { - throw new GridNotIdleException(GRID_NOT_IDLE_MSG + "[grpName=" + grpCtx.cacheOrGroupName() + - ", grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "] changed during index validation " + - "[before=" + updateCntrBefore + ", after=" + updateCntrAfter + "]"); - } - } - catch (IgniteCheckedException e) { - error(log, "Failed to process partition [grpId=" + grpCtx.groupId() + - ", partId=" + part.id() + "]", e); + boolean mvccEnabled = cctx.mvccEnabled(); - return emptyMap(); - } - finally { - part.release(); + if (mvccEnabled) { + ConnectionManager connMgr = ((IgniteH2Indexing) ignite.context().query().getIndexing()).connections(); - printProgressOfIndexValidationIfNeeded(); - } + JdbcConnection connection = (JdbcConnection) connMgr.connection().connection(); - PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName()); + session = (Session) connection.getSession(); - processedPartitions.incrementAndGet(); + MvccQueryTracker tracker = MvccUtils.mvccTracker(cctx, true); - return Collections.singletonMap(partKey, partRes); + MvccSnapshot mvccSnapshot = tracker.snapshot(); + + final QueryContext qctx = new QueryContext( + 0, + cacheName -> null, + null, + mvccSnapshot, + null, + true + ); + + session.setVariable(H2Utils.QCTX_VARIABLE_NAME, new H2Utils.ValueRuntimeSimpleObject<>(qctx)); + } + return session; } /** @@ -713,8 +794,8 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex Cursor cursor = null; - try { - cursor = idx.find((Session)null, null, null); + try (Session session = mvccSession(cacheCtxWithIdx.get1())) { + cursor = idx.find(session, null, null); if (cursor == null) throw new IgniteCheckedException("Can't iterate through index: " + idx); diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithMVCCTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithMVCCTest.java new file mode 100644 index 0000000..f5c2440 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/database/RebuildIndexWithMVCCTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.database; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Test index rebuild with MVCC enabled. + */ +public class RebuildIndexWithMVCCTest extends RebuildIndexTest { + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + final IgniteConfiguration configuration = super.getConfiguration(gridName); + + for (CacheConfiguration<?, ?> cacheConfiguration : configuration.getCacheConfiguration()) + cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT); + + return configuration; + } + +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java index 1074af3..f4a0ac9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingTestSuite.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.database.IgnitePersistentStoreSchem import org.apache.ignite.internal.processors.database.IgniteTwoRegionsRebuildIndexTest; import org.apache.ignite.internal.processors.database.RebuildIndexTest; import org.apache.ignite.internal.processors.database.RebuildIndexWithHistoricalRebalanceTest; +import org.apache.ignite.internal.processors.database.RebuildIndexWithMVCCTest; import org.junit.runner.RunWith; import org.junit.runners.Suite; @@ -53,6 +54,7 @@ import org.junit.runners.Suite; IndexingMultithreadedLoadContinuousRestartTest.class, LongDestroyDurableBackgroundTaskTest.class, RebuildIndexTest.class, + RebuildIndexWithMVCCTest.class, IgniteClusterSnapshotWithIndexesTest.class, ClientReconnectWithSqlTableConfiguredTest.class, MultipleParallelCacheDeleteDeadlockTest.class,