IGNITE-9753 Several optimization of validate_indexes - Fixes #5063. Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fe02df8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fe02df8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fe02df8 Branch: refs/heads/ignite-627 Commit: 1fe02df8daa94ca0761049cd41659f4ae18f580c Parents: 594aac8 Author: Ivan Daschinskiy <ivanda...@gmail.com> Authored: Mon Oct 29 12:14:54 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Mon Oct 29 12:14:54 2018 +0300 ---------------------------------------------------------------------- .../internal/commandline/CommandHandler.java | 34 ++- .../visor/verify/IndexIntegrityCheckIssue.java | 74 +++++++ .../verify/VisorValidateIndexesJobResult.java | 30 ++- .../resources/META-INF/classnames.properties | 1 + .../visor/verify/ValidateIndexesClosure.java | 205 ++++++++++++++++--- .../util/GridCommandHandlerIndexingTest.java | 152 ++++++++------ 6 files changed, 398 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java index 35981b5..c1853f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java @@ -95,6 +95,7 @@ import org.apache.ignite.internal.visor.tx.VisorTxSortOrder; import org.apache.ignite.internal.visor.tx.VisorTxTask; import org.apache.ignite.internal.visor.tx.VisorTxTaskArg; import org.apache.ignite.internal.visor.tx.VisorTxTaskResult; +import org.apache.ignite.internal.visor.verify.IndexIntegrityCheckIssue; import org.apache.ignite.internal.visor.verify.IndexValidationIssue; import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult; import org.apache.ignite.internal.visor.verify.VisorContentionTask; @@ -780,9 +781,20 @@ public class CommandHandler { } } - boolean errors = false; - for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry : taskRes.results().entrySet()) { + boolean errors = false; + + log("validate_indexes result on node " + nodeEntry.getKey() + ":"); + + Collection<IndexIntegrityCheckIssue> integrityCheckFailures = nodeEntry.getValue().integrityCheckFailures(); + + if (!integrityCheckFailures.isEmpty()) { + errors = true; + + for (IndexIntegrityCheckIssue is : integrityCheckFailures) + log("\t" + is.toString()); + } + Map<PartitionKey, ValidateIndexesPartitionResult> partRes = nodeEntry.getValue().partitionResult(); for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : partRes.entrySet()) { @@ -791,10 +803,10 @@ public class CommandHandler { if (!res.issues().isEmpty()) { errors = true; - log(e.getKey().toString() + " " + e.getValue().toString()); + log("\t" + e.getKey().toString() + " " + e.getValue().toString()); for (IndexValidationIssue is : res.issues()) - log(is.toString()); + log("\t\t" + is.toString()); } } @@ -806,18 +818,18 @@ public class CommandHandler { if (!res.issues().isEmpty()) { errors = true; - log("SQL Index " + e.getKey() + " " + e.getValue().toString()); + log("\tSQL Index " + e.getKey() + " " + e.getValue().toString()); for (IndexValidationIssue is : res.issues()) - log(is.toString()); + log("\t\t" + is.toString()); } } - } - if (!errors) - log("validate_indexes has finished, no issues found."); - else - log("validate_indexes has finished with errors (listed above)."); + if (!errors) + log("no issues found.\n"); + else + log("issues found (listed above).\n"); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java new file mode 100644 index 0000000..ec6e5b2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/IndexIntegrityCheckIssue.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.visor.verify; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.visor.VisorDataTransferObject; + +/** + * + */ +public class IndexIntegrityCheckIssue extends VisorDataTransferObject { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache group name. */ + private String grpName; + + /** T. */ + @GridToStringExclude + private Throwable t; + + /** + * + */ + public IndexIntegrityCheckIssue() { + // Default constructor required for Externalizable. + } + + /** + * @param grpName Group name. + * @param t Data integrity check error. + */ + public IndexIntegrityCheckIssue(String grpName, Throwable t) { + this.grpName = grpName; + this.t = t; + } + + /** {@inheritDoc} */ + @Override protected void writeExternalData(ObjectOutput out) throws IOException { + U.writeString(out, this.grpName); + out.writeObject(t); + } + + /** {@inheritDoc} */ + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { + this.grpName = U.readString(in); + this.t = (Throwable)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IndexIntegrityCheckIssue.class, this) + ", " + t.getClass() + ": " + t.getMessage(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java index aa74323..f84fc1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java @@ -20,11 +20,14 @@ package org.apache.ignite.internal.visor.verify; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import org.apache.ignite.internal.processors.cache.verify.PartitionKey; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorDataTransferObject; +import org.jetbrains.annotations.NotNull; /** * @@ -39,14 +42,22 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject { /** Results of reverse indexes validation from node. */ private Map<String, ValidateIndexesPartitionResult> idxRes; + /** Integrity check issues. */ + private Collection<IndexIntegrityCheckIssue> integrityCheckFailures; + /** * @param partRes Results of indexes validation from node. * @param idxRes Results of reverse indexes validation from node. + * @param integrityCheckFailures Collection of indexes integrity check failures. */ - public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult> partRes, - Map<String, ValidateIndexesPartitionResult> idxRes) { + public VisorValidateIndexesJobResult( + @NotNull Map<PartitionKey, ValidateIndexesPartitionResult> partRes, + @NotNull Map<String, ValidateIndexesPartitionResult> idxRes, + @NotNull Collection<IndexIntegrityCheckIssue> integrityCheckFailures + ) { this.partRes = partRes; this.idxRes = idxRes; + this.integrityCheckFailures = integrityCheckFailures; } /** @@ -57,7 +68,7 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject { /** {@inheritDoc} */ @Override public byte getProtocolVersion() { - return V2; + return V3; } /** @@ -71,13 +82,21 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject { * @return Results of reverse indexes validation from node. */ public Map<String, ValidateIndexesPartitionResult> indexResult() { - return idxRes; + return idxRes == null ? Collections.emptyMap() : idxRes; + } + + /** + * @return Collection of failed integrity checks. + */ + public Collection<IndexIntegrityCheckIssue> integrityCheckFailures() { + return integrityCheckFailures == null ? Collections.emptyList() : integrityCheckFailures; } /** {@inheritDoc} */ @Override protected void writeExternalData(ObjectOutput out) throws IOException { U.writeMap(out, partRes); U.writeMap(out, idxRes); + U.writeCollection(out, integrityCheckFailures); } /** {@inheritDoc} */ @@ -86,6 +105,9 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject { if (protoVer >= V2) idxRes = U.readMap(in); + + if (protoVer >= V3) + integrityCheckFailures = U.readCollection(in); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index a0e2ba7..fda0fe4 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -2206,6 +2206,7 @@ org.apache.ignite.internal.visor.util.VisorEventMapper org.apache.ignite.internal.visor.util.VisorExceptionWrapper org.apache.ignite.internal.visor.util.VisorTaskUtils$4 org.apache.ignite.internal.visor.verify.IndexValidationIssue +org.apache.ignite.internal.visor.verify.IndexIntegrityCheckIssue org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult org.apache.ignite.internal.visor.verify.VisorContentionJobResult org.apache.ignite.internal.visor.verify.VisorContentionTask http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java index 503b57c..ec02c25 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.visor.verify; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -37,22 +39,29 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.PageIdAllocator; +import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.verify.PartitionKey; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; @@ -104,9 +113,15 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex /** Counter of processed indexes. */ private final AtomicInteger processedIndexes = new AtomicInteger(0); + /** Counter of integrity checked indexes. */ + private final AtomicInteger integrityCheckedIndexes = new AtomicInteger(0); + /** Total partitions. */ private volatile int totalIndexes; + /** Total cache groups. */ + private volatile int totalCacheGrps; + /** Last progress print timestamp. */ private final AtomicLong lastProgressPrintTs = new AtomicLong(0); @@ -182,10 +197,14 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>(); List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>(); + totalCacheGrps = grpIds.size(); + + Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = integrityCheckIndexesPartitions(grpIds); + for (Integer grpId : grpIds) { CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId); - if (grpCtx == null) + if (grpCtx == null || integrityCheckResults.containsKey(grpId)) continue; List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions(); @@ -210,7 +229,8 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex ArrayList<Index> indexes = gridH2Tbl.getIndexes(); for (Index idx : indexes) - idxArgs.add(new T2<>(ctx, idx)); + if (idx instanceof H2TreeIndex) + idxArgs.add(new T2<>(ctx, idx)); } } } @@ -220,15 +240,15 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex Collections.shuffle(partArgs); Collections.shuffle(idxArgs); + totalPartitions = partArgs.size(); + totalIndexes = idxArgs.size(); + for (T2<CacheGroupContext, GridDhtLocalPartition> t2 : partArgs) procPartFutures.add(processPartitionAsync(t2.get1(), t2.get2())); for (T2<GridCacheContext, Index> t2 : idxArgs) procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2())); - totalPartitions = procPartFutures.size(); - totalIndexes = procIdxFutures.size(); - Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>(); Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>(); @@ -250,6 +270,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex idxResults.putAll(idxRes); } + + log.warning("ValidateIndexesClosure finished: processed " + totalPartitions + " partitions and " + + totalIndexes + " indexes."); } catch (InterruptedException | ExecutionException e) { for (int j = curPart; j < procPartFutures.size(); j++) @@ -258,15 +281,102 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex for (int j = curIdx; j < procIdxFutures.size(); j++) procIdxFutures.get(j).cancel(false); - if (e instanceof InterruptedException) - throw new IgniteInterruptedException((InterruptedException)e); - else if (e.getCause() instanceof IgniteException) - throw (IgniteException)e.getCause(); - else - throw new IgniteException(e.getCause()); + throw unwrapFutureException(e); + } + + return new VisorValidateIndexesJobResult(partResults, idxResults, integrityCheckResults.values()); + } + + /** + * @param grpIds Group ids. + */ + private Map<Integer, IndexIntegrityCheckIssue> integrityCheckIndexesPartitions(Set<Integer> grpIds) { + List<Future<T2<Integer, IndexIntegrityCheckIssue>>> integrityCheckFutures = new ArrayList<>(grpIds.size()); + + for (Integer grpId: grpIds) { + final CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId); + + if (grpCtx == null) { + integrityCheckedIndexes.incrementAndGet(); + + continue; + } + + Future<T2<Integer, IndexIntegrityCheckIssue>> checkFut = + calcExecutor.submit(new Callable<T2<Integer, IndexIntegrityCheckIssue>>() { + @Override public T2<Integer, IndexIntegrityCheckIssue> call() throws Exception { + IndexIntegrityCheckIssue issue = integrityCheckIndexPartition(grpCtx); + + return new T2<>(grpCtx.groupId(), issue); + } + }); + + integrityCheckFutures.add(checkFut); + } + + Map<Integer, IndexIntegrityCheckIssue> integrityCheckResults = new HashMap<>(); + + int curFut = 0; + try { + for (Future<T2<Integer, IndexIntegrityCheckIssue>> fut : integrityCheckFutures) { + T2<Integer, IndexIntegrityCheckIssue> res = fut.get(); + + if (res.getValue() != null) + integrityCheckResults.put(res.getKey(), res.getValue()); + } + } + catch (InterruptedException | ExecutionException e) { + for (int j = curFut; j < integrityCheckFutures.size(); j++) + integrityCheckFutures.get(j).cancel(false); + + throw unwrapFutureException(e); + } + + return integrityCheckResults; + } + + /** + * @param gctx Cache group context. + */ + private IndexIntegrityCheckIssue integrityCheckIndexPartition(CacheGroupContext gctx) { + GridKernalContext ctx = ignite.context(); + GridCacheSharedContext cctx = ctx.cache().context(); + + try { + FilePageStoreManager pageStoreMgr = (FilePageStoreManager)cctx.pageStore(); + + if (pageStoreMgr == null) + return null; + + int pageSz = gctx.dataRegion().pageMemory().pageSize(); + + PageStore pageStore = pageStoreMgr.getStore(gctx.groupId(), PageIdAllocator.INDEX_PARTITION); + + long pageId = PageIdUtils.pageId(PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX, 0); + + ByteBuffer buf = ByteBuffer.allocateDirect(pageSz); + + buf.order(ByteOrder.nativeOrder()); + + for (int pageNo = 0; pageNo < pageStore.pages(); pageId++, pageNo++) { + buf.clear(); + + pageStore.read(pageId, buf, true); + } + + return null; } + catch (Throwable t) { + log.error("Integrity check of index partition of cache group " + gctx.cacheOrGroupName() + " failed", t); + + return new IndexIntegrityCheckIssue(gctx.cacheOrGroupName(), t); + } + finally { + integrityCheckedIndexes.incrementAndGet(); - return new VisorValidateIndexesJobResult(partResults, idxResults); + printProgressIfNeeded("Current progress of ValidateIndexesClosure: checked integrity of " + + integrityCheckedIndexes.get() + " index partitions of " + totalCacheGrps + " cache groups"); + } } /** @@ -371,6 +481,19 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex if (cacheCtx == null) throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId); + if (row.link() == 0L) { + String errMsg = "Invalid partition row, possibly deleted"; + + log.error(errMsg); + + IndexValidationIssue is = new IndexValidationIssue(null, cacheCtx.name(), null, + new IgniteCheckedException(errMsg)); + + enoughIssues |= partRes.reportIssue(is); + + continue; + } + try { QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke( qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(), row.value(), true); @@ -392,6 +515,9 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex ArrayList<Index> indexes = gridH2Tbl.getIndexes(); for (Index idx : indexes) { + if (!(idx instanceof H2TreeIndex)) + continue; + try { Cursor cursor = idx.find((Session) null, h2Row, h2Row); @@ -434,7 +560,7 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex finally { part.release(); - printProgressIfNeeded(); + printProgressOfIndexValidationIfNeeded(); } PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName()); @@ -447,16 +573,21 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex /** * */ - private void printProgressIfNeeded() { - long curTs = U.currentTimeMillis(); + private void printProgressOfIndexValidationIfNeeded() { + printProgressIfNeeded("Current progress of ValidateIndexesClosure: processed " + + processedPartitions.get() + " of " + totalPartitions + " partitions, " + + processedIndexes.get() + " of " + totalIndexes + " SQL indexes"); + } + /** + * + */ + private void printProgressIfNeeded(String msg) { + long curTs = U.currentTimeMillis(); long lastTs = lastProgressPrintTs.get(); - if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs)) { - log.warning("Current progress of ValidateIndexesClosure: processed " + - processedPartitions.get() + " of " + totalPartitions + " partitions, " + - processedIndexes.get() + " of " + totalIndexes + " SQL indexes"); - } + if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs)) + log.warning(msg); } /** @@ -546,12 +677,14 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex h2key = h2Row.key(); - CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key); + if (h2Row.link() != 0L) { + CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key); - if (cacheDataStoreRow == null) - throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree."); - - previousKey = h2key; + if (cacheDataStoreRow == null) + throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree."); + } + else + throw new IgniteCheckedException("Invalid index row, possibly deleted " + h2Row); } catch (Throwable t) { Object o = CacheObjectUtils.unwrapBinaryIfNeeded( @@ -564,14 +697,34 @@ public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndex enoughIssues |= idxValidationRes.reportIssue(is); } + finally { + previousKey = h2key; + } } String uniqueIdxName = "[cache=" + ctx.name() + ", idx=" + idx.getName() + "]"; processedIndexes.incrementAndGet(); - printProgressIfNeeded(); + printProgressOfIndexValidationIfNeeded(); return Collections.singletonMap(uniqueIdxName, idxValidationRes); } + + /** + * @param e Future result exception. + * @return Unwrapped exception. + */ + private IgniteException unwrapFutureException(Exception e) { + assert e instanceof InterruptedException || e instanceof ExecutionException : "Expecting either InterruptedException " + + "or ExecutionException"; + + if (e instanceof InterruptedException) + return new IgniteInterruptedException((InterruptedException)e); + else if (e.getCause() instanceof IgniteException) + return (IgniteException)e.getCause(); + else + return new IgniteException(e.getCause()); + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/1fe02df8/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java index 48e94c1..c7693d2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java @@ -17,6 +17,9 @@ package org.apache.ignite.util; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; @@ -24,8 +27,8 @@ import java.util.List; import java.util.concurrent.ThreadLocalRandom; import javax.cache.Cache; import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; @@ -39,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.tree.SearchRow; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.lang.GridIterator; @@ -47,57 +51,35 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME; /** * */ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest { + /** Test cache name. */ + private static final String CACHE_NAME = "persons-cache-vi"; + /** * Tests that validation doesn't fail if nothing is broken. */ public void testValidateIndexesNoErrors() throws Exception { - Ignite ignite = startGrids(2); - - ignite.cluster().active(true); - - Ignite client = startGrid("client"); - - String cacheName = "persons-cache-vi"; - - IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName); - - ThreadLocalRandom rand = ThreadLocalRandom.current(); - - for (int i = 0; i < 10_000; i++) - personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong()))); + prepareGridForTest(); injectTestSystemOut(); - assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName)); + assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME)); - assertTrue(testOut.toString().contains("validate_indexes has finished, no issues found")); + assertTrue(testOut.toString().contains("no issues found")); } /** * Tests that missing rows in CacheDataTree are detected. */ public void testBrokenCacheDataTreeShouldFailValidation() throws Exception { - Ignite ignite = startGrids(2); + Ignite ignite = prepareGridForTest(); - ignite.cluster().active(true); - - Ignite client = startGrid("client"); - - String cacheName = "persons-cache-vi"; - - IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName); - - ThreadLocalRandom rand = ThreadLocalRandom.current(); - - for (int i = 0; i < 10_000; i++) - personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong()))); - - breakCacheDataTree(ignite, cacheName, 1); + breakCacheDataTree(ignite, CACHE_NAME, 1); injectTestSystemOut(); @@ -105,11 +87,11 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest { execute( "--cache", "validate_indexes", - cacheName, + CACHE_NAME, "checkFirst", "10000", "checkThrough", "10")); - assertTrue(testOut.toString().contains("validate_indexes has finished with errors")); + assertTrue(testOut.toString().contains("issues found (listed above)")); assertTrue(testOut.toString().contains( "Key is present in SQL index, but is missing in corresponding data page.")); @@ -119,6 +101,46 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest { * Tests that missing rows in H2 indexes are detected. */ public void testBrokenSqlIndexShouldFailValidation() throws Exception { + Ignite ignite = prepareGridForTest(); + + breakSqlIndex(ignite, CACHE_NAME); + + injectTestSystemOut(); + + assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME)); + + assertTrue(testOut.toString().contains("issues found (listed above)")); + } + + /** + * Tests that missing rows in H2 indexes are detected. + */ + public void testCorruptedIndexPartitionShouldFailValidation() throws Exception { + Ignite ignite = prepareGridForTest(); + + forceCheckpoint(); + + File idxPath = indexPartition(ignite, CACHE_NAME); + + stopAllGrids(); + + corruptIndexPartition(idxPath); + + startGrids(2); + + awaitPartitionMapExchange(); + + injectTestSystemOut(); + + assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", CACHE_NAME)); + + assertTrue(testOut.toString().contains("issues found (listed above)")); + } + + /** + * + */ + private Ignite prepareGridForTest() throws Exception{ Ignite ignite = startGrids(2); ignite.cluster().active(true); @@ -127,20 +149,52 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest { String cacheName = "persons-cache-vi"; - IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName); + client.getOrCreateCache(new CacheConfiguration<Integer, Person>() + .setName(cacheName) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setBackups(1) + .setQueryEntities(F.asList(personEntity(true, true))) + .setAffinity(new RendezvousAffinityFunction(false, 32))); ThreadLocalRandom rand = ThreadLocalRandom.current(); - for (int i = 0; i < 10_000; i++) - personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong()))); + try (IgniteDataStreamer<Integer, Person> streamer = client.dataStreamer(CACHE_NAME);) { + for (int i = 0; i < 10_000; i++) + streamer.addData(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong()))); + } - breakSqlIndex(ignite, cacheName); + return ignite; + } - injectTestSystemOut(); + /** + * Get index partition file for specific node and cache. + */ + private File indexPartition(Ignite ig, String cacheName) { + IgniteEx ig0 = (IgniteEx)ig; - assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName)); + FilePageStoreManager pageStoreManager = ((FilePageStoreManager) ig0.context().cache().context().pageStore()); - assertTrue(testOut.toString().contains("validate_indexes has finished with errors")); + return new File(pageStoreManager.cacheWorkDir(false, cacheName), INDEX_FILE_NAME); + } + + /** + * Write some random trash in index partition. + */ + private void corruptIndexPartition(File path) throws IOException { + assertTrue(path.exists()); + + ThreadLocalRandom rand = ThreadLocalRandom.current(); + + try (RandomAccessFile idx = new RandomAccessFile(path, "rw")) { + byte[] trash = new byte[1024]; + + rand.nextBytes(trash); + + idx.seek(4096); + + idx.write(trash); + } } /** @@ -242,22 +296,6 @@ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest { } /** - * Dynamically creates cache with SQL indexes. - * - * @param ig Client. - * @param cacheName Cache name. - */ - private IgniteCache<Integer, Person> createPersonCache(Ignite ig, String cacheName) { - return ig.getOrCreateCache(new CacheConfiguration<Integer, Person>() - .setName(cacheName) - .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) - .setAtomicityMode(CacheAtomicityMode.ATOMIC) - .setBackups(1) - .setQueryEntities(F.asList(personEntity(true, true))) - .setAffinity(new RendezvousAffinityFunction(false, 32))); - } - - /** * @param idxName Index name. * @param idxOrgId Index org id. */