Repository: ignite Updated Branches: refs/heads/master 771e8619e -> 88a6bfdd9
IGNITE-8499 validate_indexes command doesn't detect absent rows in cache data tree Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88a6bfdd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88a6bfdd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88a6bfdd Branch: refs/heads/master Commit: 88a6bfdd9c7faea3d230b9959c773900b94356b1 Parents: 771e861 Author: Ivan Rakov <[email protected]> Authored: Wed May 16 20:13:30 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Wed May 16 20:13:30 2018 +0300 ---------------------------------------------------------------------- .../internal/commandline/CommandHandler.java | 21 +- .../verify/ValidateIndexesPartitionResult.java | 31 ++- .../verify/VisorValidateIndexesJobResult.java | 38 ++- .../visor/verify/ValidateIndexesClosure.java | 264 ++++++++++++++----- .../visor/verify/VisorValidateIndexesTask.java | 6 +- .../IgniteCacheWithIndexingTestSuite.java | 3 + .../util/GridCommandHandlerIndexingTest.java | 203 +++++++++++++- 7 files changed, 477 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java index 7d457fd..04578e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java @@ -100,8 +100,8 @@ import static org.apache.ignite.internal.commandline.Command.BASELINE; import static org.apache.ignite.internal.commandline.Command.CACHE; import static org.apache.ignite.internal.commandline.Command.DEACTIVATE; import static org.apache.ignite.internal.commandline.Command.STATE; -import static org.apache.ignite.internal.commandline.Command.WAL; import static org.apache.ignite.internal.commandline.Command.TX; +import static org.apache.ignite.internal.commandline.Command.WAL; import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.ADD; import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.COLLECT; import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.REMOVE; @@ -635,9 +635,9 @@ public class CommandHandler { boolean errors = false; for (Map.Entry<UUID, VisorValidateIndexesJobResult> nodeEntry : taskRes.results().entrySet()) { - Map<PartitionKey, ValidateIndexesPartitionResult> map = nodeEntry.getValue().response(); + Map<PartitionKey, ValidateIndexesPartitionResult> partRes = nodeEntry.getValue().partitionResult(); - for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : map.entrySet()) { + for (Map.Entry<PartitionKey, ValidateIndexesPartitionResult> e : partRes.entrySet()) { ValidateIndexesPartitionResult res = e.getValue(); if (!res.issues().isEmpty()) { @@ -649,6 +649,21 @@ public class CommandHandler { log(is.toString()); } } + + Map<String, ValidateIndexesPartitionResult> idxRes = nodeEntry.getValue().indexResult(); + + for (Map.Entry<String, ValidateIndexesPartitionResult> e : idxRes.entrySet()) { + ValidateIndexesPartitionResult res = e.getValue(); + + if (!res.issues().isEmpty()) { + errors = true; + + log("SQL Index " + e.getKey() + " " + e.getValue().toString()); + + for (IndexValidationIssue is : res.issues()) + log(is.toString()); + } + } } if (!errors) http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java index 1889960..5d74a57 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesPartitionResult.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorDataTransferObject; /** - * + * Encapsulates intermediate results of validation of SQL index (if {@link #sqlIdxName} is present) or partition. */ public class ValidateIndexesPartitionResult extends VisorDataTransferObject { /** */ @@ -52,6 +52,10 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject { @GridToStringExclude private List<IndexValidationIssue> issues = new ArrayList<>(10); + /** Sql index name. */ + @GridToStringExclude + private String sqlIdxName; + /** * */ @@ -64,12 +68,15 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject { * @param size Size. * @param isPrimary Is primary. * @param consistentId Consistent id. + * @param sqlIdxName Sql index name (optional). */ - public ValidateIndexesPartitionResult(long updateCntr, long size, boolean isPrimary, Object consistentId) { + public ValidateIndexesPartitionResult(long updateCntr, long size, boolean isPrimary, Object consistentId, + String sqlIdxName) { this.updateCntr = updateCntr; this.size = size; this.isPrimary = isPrimary; this.consistentId = consistentId; + this.sqlIdxName = sqlIdxName; } /** @@ -108,6 +115,13 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject { } /** + * @return <code>null</code> for partition validation result, SQL index name for index validation result + */ + public String sqlIndexName() { + return sqlIdxName; + } + + /** * @param t Issue. * @return True if there are already enough issues. */ @@ -121,12 +135,18 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject { } /** {@inheritDoc} */ + @Override public byte getProtocolVersion() { + return V2; + } + + /** {@inheritDoc} */ @Override protected void writeExternalData(ObjectOutput out) throws IOException { out.writeLong(updateCntr); out.writeLong(size); out.writeBoolean(isPrimary); out.writeObject(consistentId); U.writeCollection(out, issues); + U.writeString(out, sqlIdxName); } /** {@inheritDoc} */ @@ -136,10 +156,15 @@ public class ValidateIndexesPartitionResult extends VisorDataTransferObject { isPrimary = in.readBoolean(); consistentId = in.readObject(); issues = U.readList(in); + + if (protoVer >= V2) + sqlIdxName = U.readString(in); } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(ValidateIndexesPartitionResult.class, this); + return sqlIdxName == null ? S.toString(ValidateIndexesPartitionResult.class, this) : + ValidateIndexesPartitionResult.class.getSimpleName() + " [consistentId=" + consistentId + + ", sqlIdxName=" + sqlIdxName + "]"; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java index 25c97b6..aa74323 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesJobResult.java @@ -34,13 +34,19 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject { private static final long serialVersionUID = 0L; /** Results of indexes validation from node. */ - private Map<PartitionKey, ValidateIndexesPartitionResult> res; + private Map<PartitionKey, ValidateIndexesPartitionResult> partRes; + + /** Results of reverse indexes validation from node. */ + private Map<String, ValidateIndexesPartitionResult> idxRes; /** - * @param res Results of indexes validation from node. + * @param partRes Results of indexes validation from node. + * @param idxRes Results of reverse indexes validation from node. */ - public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult> res) { - this.res = res; + public VisorValidateIndexesJobResult(Map<PartitionKey, ValidateIndexesPartitionResult> partRes, + Map<String, ValidateIndexesPartitionResult> idxRes) { + this.partRes = partRes; + this.idxRes = idxRes; } /** @@ -49,21 +55,37 @@ public class VisorValidateIndexesJobResult extends VisorDataTransferObject { public VisorValidateIndexesJobResult() { } + /** {@inheritDoc} */ + @Override public byte getProtocolVersion() { + return V2; + } + /** * @return Results of indexes validation from node. */ - public Map<PartitionKey, ValidateIndexesPartitionResult> response() { - return res; + public Map<PartitionKey, ValidateIndexesPartitionResult> partitionResult() { + return partRes; + } + + /** + * @return Results of reverse indexes validation from node. + */ + public Map<String, ValidateIndexesPartitionResult> indexResult() { + return idxRes; } /** {@inheritDoc} */ @Override protected void writeExternalData(ObjectOutput out) throws IOException { - U.writeMap(out, res); + U.writeMap(out, partRes); + U.writeMap(out, idxRes); } /** {@inheritDoc} */ @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { - res = U.readMap(in); + partRes = U.readMap(in); + + if (protoVer >= V2) + idxRes = U.readMap(in); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java index 373bd15..e0eff61 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/ValidateIndexesClosure.java @@ -31,9 +31,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; @@ -51,12 +50,15 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.verify.PartitionKey; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.lang.GridIterator; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.resources.IgniteInstanceResource; @@ -66,9 +68,13 @@ import org.h2.index.Cursor; import org.h2.index.Index; /** - * + * Closure that locally validates indexes of given caches. + * Validation consists of three checks: + * 1. If entry is present in cache data tree, it's reachable from all cache SQL indexes + * 2. If entry is present in cache SQL index, it can be dereferenced with link from index + * 3. If entry is present in cache SQL index, it's present in cache data tree */ -public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, ValidateIndexesPartitionResult>> { +public class ValidateIndexesClosure implements IgniteCallable<VisorValidateIndexesJobResult> { /** */ private static final long serialVersionUID = 0L; @@ -84,7 +90,19 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, private Set<String> cacheNames; /** Counter of processed partitions. */ - private final AtomicInteger completionCntr = new AtomicInteger(0); + private final AtomicInteger processedPartitions = new AtomicInteger(0); + + /** Total partitions. */ + private volatile int totalPartitions; + + /** Counter of processed indexes. */ + private final AtomicInteger processedIndexes = new AtomicInteger(0); + + /** Total partitions. */ + private volatile int totalIndexes; + + /** Last progress print timestamp. */ + private final AtomicLong lastProgressPrintTs = new AtomicLong(0); /** Calculation executor. */ private volatile ExecutorService calcExecutor; @@ -97,7 +115,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, } /** {@inheritDoc} */ - @Override public Map<PartitionKey, ValidateIndexesPartitionResult> call() throws Exception { + @Override public VisorValidateIndexesJobResult call() throws Exception { calcExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); try { @@ -111,7 +129,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, /** * */ - private Map<PartitionKey, ValidateIndexesPartitionResult> call0() throws Exception { + private VisorValidateIndexesJobResult call0() throws Exception { Set<Integer> grpIds = new HashSet<>(); Set<String> missingCaches = new HashSet<>(); @@ -150,8 +168,9 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, } List<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>> procPartFutures = new ArrayList<>(); - - completionCntr.set(0); + List<Future<Map<String, ValidateIndexesPartitionResult>>> procIdxFutures = new ArrayList<>(); + List<T2<CacheGroupContext, GridDhtLocalPartition>> partArgs = new ArrayList<>(); + List<T2<GridCacheContext, Index>> idxArgs = new ArrayList<>(); for (Integer grpId : grpIds) { CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId); @@ -162,45 +181,82 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, List<GridDhtLocalPartition> parts = grpCtx.topology().localPartitions(); for (GridDhtLocalPartition part : parts) - procPartFutures.add(processPartitionAsync(grpCtx, part)); - } + partArgs.add(new T2<>(grpCtx, part)); - Map<PartitionKey, ValidateIndexesPartitionResult> res = new HashMap<>(); + GridQueryProcessor qry = ignite.context().query(); - long lastProgressLogTs = U.currentTimeMillis(); + IgniteH2Indexing indexing = (IgniteH2Indexing)qry.getIndexing(); - for (int i = 0; i < procPartFutures.size(); ) { - Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut = procPartFutures.get(i); + for (GridCacheContext ctx : grpCtx.caches()) { + Collection<GridQueryTypeDescriptor> types = qry.types(ctx.name()); - try { - Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get(1, TimeUnit.SECONDS); + if (!F.isEmpty(types)) { + for (GridQueryTypeDescriptor type : types) { + GridH2Table gridH2Tbl = indexing.dataTable(ctx.name(), type.tableName()); + + if (gridH2Tbl == null) + continue; - res.putAll(partRes); + ArrayList<Index> indexes = gridH2Tbl.getIndexes(); - i++; + for (Index idx : indexes) + idxArgs.add(new T2<>(ctx, idx)); + } + } } - catch (InterruptedException | ExecutionException e) { - for (int j = i + 1; j < procPartFutures.size(); j++) - procPartFutures.get(j).cancel(false); - - if (e instanceof InterruptedException) - throw new IgniteInterruptedException((InterruptedException)e); - else if (e.getCause() instanceof IgniteException) - throw (IgniteException)e.getCause(); - else - throw new IgniteException(e.getCause()); + } + + // To decrease contention on same indexes. + Collections.shuffle(partArgs); + Collections.shuffle(idxArgs); + + for (T2<CacheGroupContext, GridDhtLocalPartition> t2 : partArgs) + procPartFutures.add(processPartitionAsync(t2.get1(), t2.get2())); + + for (T2<GridCacheContext, Index> t2 : idxArgs) + procIdxFutures.add(processIndexAsync(t2.get1(), t2.get2())); + + totalPartitions = procPartFutures.size(); + totalIndexes = procIdxFutures.size(); + + Map<PartitionKey, ValidateIndexesPartitionResult> partResults = new HashMap<>(); + Map<String, ValidateIndexesPartitionResult> idxResults = new HashMap<>(); + + int curPart = 0; + int curIdx = 0; + try { + for (; curPart < procPartFutures.size(); curPart++) { + Future<Map<PartitionKey, ValidateIndexesPartitionResult>> fut = procPartFutures.get(curPart); + + Map<PartitionKey, ValidateIndexesPartitionResult> partRes = fut.get(); + + partResults.putAll(partRes); } - catch (TimeoutException ignored) { - if (U.currentTimeMillis() - lastProgressLogTs > 60 * 1000L) { - lastProgressLogTs = U.currentTimeMillis(); - log.warning("ValidateIndexesClosure is still running, processed " + completionCntr.get() + " of " + - procPartFutures.size() + " local partitions"); - } + for (; curIdx < procIdxFutures.size(); curIdx++) { + Future<Map<String, ValidateIndexesPartitionResult>> fut = procIdxFutures.get(curIdx); + + Map<String, ValidateIndexesPartitionResult> idxRes = fut.get(); + + idxResults.putAll(idxRes); } } + catch (InterruptedException | ExecutionException e) { + for (int j = curPart; j < procPartFutures.size(); j++) + procPartFutures.get(j).cancel(false); + + for (int j = curIdx; j < procIdxFutures.size(); j++) + procIdxFutures.get(j).cancel(false); + + if (e instanceof InterruptedException) + throw new IgniteInterruptedException((InterruptedException)e); + else if (e.getCause() instanceof IgniteException) + throw (IgniteException)e.getCause(); + else + throw new IgniteException(e.getCause()); + } - return res; + return new VisorValidateIndexesJobResult(partResults, idxResults); } /** @@ -245,12 +301,24 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion()); - partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary, consId); + partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary, consId, null); boolean enoughIssues = false; - long keysProcessed = 0; - long lastProgressLog = U.currentTimeMillis(); + GridQueryProcessor qryProcessor = ignite.context().query(); + + Method m; + try { + m = GridQueryProcessor.class.getDeclaredMethod("typeByValue", String.class, + CacheObjectContext.class, KeyCacheObject.class, CacheObject.class, boolean.class); + } + catch (NoSuchMethodException e) { + log.error("Failed to invoke typeByValue", e); + + throw new IgniteException(e); + } + + m.setAccessible(true); while (it.hasNextX()) { if (enoughIssues) @@ -266,14 +334,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, if (cacheCtx == null) throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId); - GridQueryProcessor qryProcessor = ignite.context().query(); - try { - Method m = GridQueryProcessor.class.getDeclaredMethod("typeByValue", String.class, - CacheObjectContext.class, KeyCacheObject.class, CacheObject.class, boolean.class); - - m.setAccessible(true); - QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke( qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(), row.value(), true); @@ -298,7 +359,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, Cursor cursor = idx.find((Session) null, h2Row, h2Row); if (cursor == null || !cursor.next()) - throw new IgniteCheckedException("Key not found."); + throw new IgniteCheckedException("Key is present in CacheDataTree, but can't be found in SQL index."); } catch (Throwable t) { Object o = CacheObjectUtils.unwrapBinaryIfNeeded( @@ -313,7 +374,7 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, } } } - catch (IllegalAccessException | NoSuchMethodException e) { + catch (IllegalAccessException e) { log.error("Failed to invoke typeByValue", e); throw new IgniteException(e); @@ -325,16 +386,6 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, throw new IgniteException(target); } - finally { - keysProcessed++; - - if (U.currentTimeMillis() - lastProgressLog >= 60_000 && partSize > 0) { - log.warning("Processing partition " + part.id() + " (" + (keysProcessed * 100 / partSize) + - "% " + keysProcessed + "/" + partSize + ")"); - - lastProgressLog = U.currentTimeMillis(); - } - } } } catch (IgniteCheckedException e) { @@ -345,12 +396,107 @@ public class ValidateIndexesClosure implements IgniteCallable<Map<PartitionKey, } finally { part.release(); + + printProgressIfNeeded(); } PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName()); - completionCntr.incrementAndGet(); + processedPartitions.incrementAndGet(); return Collections.singletonMap(partKey, partRes); } + + /** + * + */ + private void printProgressIfNeeded() { + long curTs = U.currentTimeMillis(); + + long lastTs = lastProgressPrintTs.get(); + + if (curTs - lastTs >= 60_000 && lastProgressPrintTs.compareAndSet(lastTs, curTs)) { + log.warning("Current progress of ValidateIndexesClosure: processed " + + processedPartitions.get() + " of " + totalPartitions + " partitions, " + + processedIndexes.get() + " of " + totalIndexes + " SQL indexes"); + } + } + + /** + * @param ctx Context. + * @param idx Index. + */ + private Future<Map<String, ValidateIndexesPartitionResult>> processIndexAsync(GridCacheContext ctx, Index idx) { + return calcExecutor.submit(new Callable<Map<String, ValidateIndexesPartitionResult>>() { + @Override public Map<String, ValidateIndexesPartitionResult> call() throws Exception { + return processIndex(ctx, idx); + } + }); + } + + /** + * @param ctx Context. + * @param idx Index. + */ + private Map<String, ValidateIndexesPartitionResult> processIndex(GridCacheContext ctx, Index idx) { + Object consId = ignite.context().discovery().localNode().consistentId(); + + ValidateIndexesPartitionResult idxValidationRes = new ValidateIndexesPartitionResult( + -1, -1, true, consId, idx.getName()); + + boolean enoughIssues = false; + + Cursor cursor = null; + + try { + cursor = idx.find((Session)null, null, null); + + if (cursor == null) + throw new IgniteCheckedException("Can't iterate through index: " + idx); + } + catch (Throwable t) { + IndexValidationIssue is = new IndexValidationIssue(null, ctx.name(), idx.getName(), t); + + log.error("Find in index failed: " + is.toString()); + + enoughIssues = true; + } + + while (!enoughIssues) { + KeyCacheObject h2key = null; + + try { + if (!cursor.next()) + break; + + GridH2Row h2Row = (GridH2Row)cursor.get(); + + h2key = h2Row.key(); + + CacheDataRow cacheDataStoreRow = ctx.group().offheap().read(ctx, h2key); + + if (cacheDataStoreRow == null) + throw new IgniteCheckedException("Key is present in SQL index, but can't be found in CacheDataTree."); + } + catch (Throwable t) { + Object o = CacheObjectUtils.unwrapBinaryIfNeeded( + ctx.cacheObjectContext(), h2key, true, true); + + IndexValidationIssue is = new IndexValidationIssue( + String.valueOf(o), ctx.name(), idx.getName(), t); + + log.error("Failed to lookup key: " + is.toString()); + + enoughIssues |= idxValidationRes.reportIssue(is); + } + } + + String uniqueIdxName = "[cache=" + ctx.name() + ", idx=" + idx.getName() + "]"; + + processedIndexes.incrementAndGet(); + + printProgressIfNeeded(); + + return Collections.singletonMap(uniqueIdxName, idxValidationRes); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java index 1a89c2c..52b48a5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/visor/verify/VisorValidateIndexesTask.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteException; import org.apache.ignite.compute.ComputeJobResult; -import org.apache.ignite.internal.processors.cache.verify.PartitionKey; import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.visor.VisorJob; @@ -81,10 +80,7 @@ public class VisorValidateIndexesTask extends VisorMultiNodeTask<VisorValidateIn ignite.context().resource().injectGeneric(clo); - Map<PartitionKey, ValidateIndexesPartitionResult> res = clo.call(); - - return new VisorValidateIndexesJobResult(res); - + return clo.call(); } catch (Exception e) { throw new IgniteException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java index bc99981..c896736 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalLoca import org.apache.ignite.internal.processors.cache.ttl.CacheTtlTransactionalPartitionedSelfTest; import org.apache.ignite.internal.processors.client.IgniteDataStreamerTest; import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelperTest; +import org.apache.ignite.util.GridCommandHandlerIndexingTest; /** * Cache tests using indexing. @@ -81,6 +82,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite { suite.addTestSuite(IgniteDataStreamerTest.class); + suite.addTestSuite(GridCommandHandlerIndexingTest.class); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/88a6bfdd/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java index 9e9c777..62d3fc0 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/util/GridCommandHandlerIndexingTest.java @@ -19,17 +19,32 @@ package org.apache.ignite.util; import java.io.Serializable; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.tree.SearchRow; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK; @@ -38,36 +53,202 @@ import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK */ public class GridCommandHandlerIndexingTest extends GridCommandHandlerTest { /** - * + * Tests that validation doesn't fail if nothing is broken. */ - public void testValidateIndexes() throws Exception { + public void testValidateIndexesNoErrors() throws Exception { Ignite ignite = startGrids(2); ignite.cluster().active(true); Ignite client = startGrid("client"); - IgniteCache<Integer, Person> personCache = client.getOrCreateCache(new CacheConfiguration<Integer, Person>() - .setName("persons-cache-vi") - .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) - .setAtomicityMode(CacheAtomicityMode.ATOMIC) - .setBackups(1) - .setQueryEntities(F.asList(personEntity(true, true))) - .setAffinity(new RendezvousAffinityFunction(false, 32))); + String cacheName = "persons-cache-vi"; + + IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName); ThreadLocalRandom rand = ThreadLocalRandom.current(); - for (int i = 0; i < 1000; i++) + for (int i = 0; i < 10_000; i++) personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong()))); injectTestSystemOut(); - assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", "persons-cache-vi")); + assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName)); assertTrue(testOut.toString().contains("validate_indexes has finished, no issues found")); } /** + * Tests that missing rows in CacheDataTree are detected. + */ + public void testBrokenCacheDataTreeShouldFailValidation() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().active(true); + + Ignite client = startGrid("client"); + + String cacheName = "persons-cache-vi"; + + IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName); + + ThreadLocalRandom rand = ThreadLocalRandom.current(); + + for (int i = 0; i < 10_000; i++) + personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong()))); + + breakCacheDataTree(ignite, cacheName, 1); + + injectTestSystemOut(); + + assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName)); + + assertTrue(testOut.toString().contains("validate_indexes has finished with errors")); + } + + /** + * Tests that missing rows in H2 indexes are detected. + */ + public void testBrokenSqlIndexShouldFailValidation() throws Exception { + Ignite ignite = startGrids(2); + + ignite.cluster().active(true); + + Ignite client = startGrid("client"); + + String cacheName = "persons-cache-vi"; + + IgniteCache<Integer, Person> personCache = createPersonCache(client, cacheName); + + ThreadLocalRandom rand = ThreadLocalRandom.current(); + + for (int i = 0; i < 10_000; i++) + personCache.put(i, new Person(rand.nextInt(), String.valueOf(rand.nextLong()))); + + breakSqlIndex(ignite, cacheName); + + injectTestSystemOut(); + + assertEquals(EXIT_CODE_OK, execute("--cache", "validate_indexes", cacheName)); + + assertTrue(testOut.toString().contains("validate_indexes has finished with errors")); + } + + /** + * Removes some entries from a partition skipping index update. This effectively breaks the index. + */ + private void breakCacheDataTree(Ignite ig, String cacheName, int partId) { + IgniteEx ig0 = (IgniteEx)ig; + int cacheId = CU.cacheId(cacheName); + + ScanQuery scanQry = new ScanQuery(partId); + + GridCacheContext<Object, Object> ctx = ig0.context().cache().context().cacheContext(cacheId); + + // Get current update counter + String grpName = ig0.context().cache().context().cacheContext(cacheId).config().getGroupName(); + int cacheGrpId = grpName == null ? cacheName.hashCode() : grpName.hashCode(); + + GridDhtLocalPartition locPart = ctx.dht().topology().localPartition(partId); + IgniteCacheOffheapManager.CacheDataStore dataStore = ig0.context().cache().context().cache().cacheGroup(cacheGrpId).offheap().dataStore(locPart); + + Iterator<Cache.Entry> it = ig.cache(cacheName).withKeepBinary().query(scanQry).iterator(); + + for (int i = 0; i < 5_000; i++) { + if (it.hasNext()) { + Cache.Entry entry = it.next(); + + if (i % 5 == 0) { + // Do update + GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig0.context().cache().context().database(); + + db.checkpointReadLock(); + + try { + IgniteCacheOffheapManager.CacheDataStore innerStore = U.field(dataStore, "delegate"); + + // IgniteCacheOffheapManagerImpl.CacheDataRowStore + Object rowStore = U.field(innerStore, "rowStore"); + + // IgniteCacheOffheapManagerImpl.CacheDataTree + Object dataTree = U.field(innerStore, "dataTree"); + + CacheDataRow oldRow = U.invoke( + dataTree.getClass(), + dataTree, + "remove", + new SearchRow(cacheId, ctx.toCacheKeyObject(entry.getKey()))); + + if (oldRow != null) + U.invoke(rowStore.getClass(), rowStore, "removeRow", oldRow.link()); + } + catch (IgniteCheckedException e) { + System.out.println("Failed to remove key skipping indexes: " + entry); + + e.printStackTrace(); + } + finally { + db.checkpointReadUnlock(); + } + } + } + else { + System.out.println("Early exit for index corruption, keys processed: " + i); + + break; + } + } + } + + /** + * Removes some entries from H2 trees skipping partition updates. This effectively breaks the index. + */ + private void breakSqlIndex(Ignite ig, String cacheName) throws Exception { + GridQueryProcessor qry = ((IgniteEx)ig).context().query(); + + GridCacheContext<Object, Object> ctx = ((IgniteEx)ig).cachex(cacheName).context(); + + GridDhtLocalPartition locPart = ctx.topology().localPartitions().get(0); + + GridIterator<CacheDataRow> it = ctx.group().offheap().partitionIterator(locPart.id()); + + for (int i = 0; i < 500; i++) { + if (!it.hasNextX()) { + System.out.println("Early exit for index corruption, keys processed: " + i); + + break; + } + + CacheDataRow row = it.nextX(); + + ctx.shared().database().checkpointReadLock(); + + try { + qry.remove(ctx, row); + } + finally { + ctx.shared().database().checkpointReadUnlock(); + } + } + } + + /** + * Dynamically creates cache with SQL indexes. + * + * @param ig Client. + * @param cacheName Cache name. + */ + private IgniteCache<Integer, Person> createPersonCache(Ignite ig, String cacheName) { + return ig.getOrCreateCache(new CacheConfiguration<Integer, Person>() + .setName(cacheName) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setAtomicityMode(CacheAtomicityMode.ATOMIC) + .setBackups(1) + .setQueryEntities(F.asList(personEntity(true, true))) + .setAffinity(new RendezvousAffinityFunction(false, 32))); + } + + /** * @param idxName Index name. * @param idxOrgId Index org id. */
