IGNITE-5553 Ignite PDS 2: IgnitePersistentStoreDataStructuresTest testSet assertion error
Signed-off-by: Anton Vinogradov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b3742f4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b3742f4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b3742f4 Branch: refs/heads/ignite-7251 Commit: 1b3742f4d7bedf0bb5c262786d386647b5b86e35 Parents: 387674e Author: pereslegin-pa <[email protected]> Authored: Mon Sep 17 18:08:41 2018 +0300 Committer: Anton Vinogradov <[email protected]> Committed: Mon Sep 17 18:08:41 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheGroupContext.java | 2 - .../processors/cache/GridCacheMapEntry.java | 10 -- .../CacheDataStructuresManager.java | 108 +++---------------- .../cache/query/GridCacheQueryManager.java | 53 +++------ .../datastructures/DataStructuresProcessor.java | 2 +- .../datastructures/GridCacheSetImpl.java | 15 +-- .../GridCacheSetAbstractSelfTest.java | 11 +- ...IgnitePersistentStoreDataStructuresTest.java | 2 - 8 files changed, 38 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index b92280a..d5e2d66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -795,8 +795,6 @@ public class CacheGroupContext { cctx.dr().partitionEvicted(part); cctx.continuousQueries().onPartitionEvicted(part); - - cctx.dataStructures().onPartitionEvicted(part); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 4806733..fd8b2cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1526,8 +1526,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, topVer); } - - cctx.dataStructures().onEntryUpdated(key, false, keepBinary); } finally { unlockEntry(); @@ -1740,8 +1738,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme topVer); } - cctx.dataStructures().onEntryUpdated(key, true, keepBinary); - deferred = cctx.deferredDelete() && !detached() && !isInternal(); if (intercept) @@ -2133,8 +2129,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme onUpdateFinished(updateCntr); } - cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); - if (intercept) { if (op == GridCacheOperation.UPDATE) cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(cctx, key, key0, updated, updated0, keepBinary, 0L)); @@ -2427,8 +2421,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme topVer); } - cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary); - if (intercept) { if (c.op == GridCacheOperation.UPDATE) { cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( @@ -3377,8 +3369,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme updateCntr, null, topVer); - - cctx.dataStructures().onEntryUpdated(key, false, false); } onUpdateFinished(updateCntr); http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 7d45c81..932f000 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -25,7 +25,6 @@ import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.UUID; import java.util.concurrent.Callable; @@ -33,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.Cache; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; @@ -40,6 +40,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSet; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -49,7 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheGateway; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.datastructures.GridAtomicCacheQueueImpl; import org.apache.ignite.internal.processors.datastructures.GridCacheQueueHeader; @@ -62,7 +62,6 @@ import org.apache.ignite.internal.processors.datastructures.GridCacheSetProxy; import org.apache.ignite.internal.processors.datastructures.GridTransactionalCacheQueueImpl; import org.apache.ignite.internal.processors.datastructures.SetItemKey; import org.apache.ignite.internal.processors.task.GridInternal; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -105,10 +104,6 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { /** Sets map. */ private final ConcurrentMap<IgniteUuid, GridCacheSetProxy> setsMap; - /** Set keys used for set iteration. */ - private ConcurrentMap<IgniteUuid, GridConcurrentHashSet<SetItemKey>> setDataMap = - new ConcurrentHashMap<>(); - /** Queues map. */ private final ConcurrentMap<IgniteUuid, GridCacheQueueProxy> queuesMap; @@ -344,45 +339,6 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } /** - * Entry update callback. - * - * @param key Key. - * @param rmv {@code True} if entry was removed. - * @param keepBinary Keep binary flag. - */ - public void onEntryUpdated(KeyCacheObject key, boolean rmv, boolean keepBinary) { - // No need to notify data structures manager for a user cache since all DS objects are stored - // in system caches. - if (cctx.userCache()) - return; - - Object key0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false); - - if (key0 instanceof SetItemKey) - onSetItemUpdated((SetItemKey)key0, rmv); - } - - /** - * Partition evicted callback. - * - * @param part Partition number. - */ - public void onPartitionEvicted(int part) { - GridCacheAffinityManager aff = cctx.affinity(); - - for (GridConcurrentHashSet<SetItemKey> set : setDataMap.values()) { - Iterator<SetItemKey> iter = set.iterator(); - - while (iter.hasNext()) { - SetItemKey key = iter.next(); - - if (aff.partition(key) == part) - iter.remove(); - } - } - } - - /** * @param name Set name. * @param colloc Collocated flag. * @param create Create flag. @@ -462,14 +418,6 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } /** - * @param id Set ID. - * @return Data for given set. - */ - @Nullable public GridConcurrentHashSet<SetItemKey> setData(IgniteUuid id) { - return setDataMap.get(id); - } - - /** * @param setId Set ID. * @param topVer Topology version. * @throws IgniteCheckedException If failed. @@ -486,22 +434,19 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { cctx.preloader().syncFuture().get(); } - GridConcurrentHashSet<SetItemKey> set = setDataMap.get(setId); - - if (set == null) - return; - - IgniteInternalCache cache = cctx.cache(); + IgniteInternalCache<?, ?> cache = cctx.cache(); final int BATCH_SIZE = 100; Collection<SetItemKey> keys = new ArrayList<>(BATCH_SIZE); - for (SetItemKey key : set) { - if (!loc && !aff.primaryByKey(cctx.localNode(), key, topVer)) + for (Cache.Entry entry : cache.localEntries(new CachePeekMode[] {CachePeekMode.PRIMARY})) { + Object obj = entry.getKey(); + + if (!(obj instanceof SetItemKey && setId.equals(((SetItemKey)obj).setId()))) continue; - keys.add(key); + keys.add((SetItemKey)obj); if (keys.size() == BATCH_SIZE) { retryRemoveAll(cache, keys); @@ -512,16 +457,15 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { if (!keys.isEmpty()) retryRemoveAll(cache, keys); - - setDataMap.remove(setId); } /** * @param id Set ID. + * @param separated Separated cache flag. * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - public void removeSetData(IgniteUuid id) throws IgniteCheckedException { + public void removeSetData(IgniteUuid id, boolean separated) throws IgniteCheckedException { assert id != null; if (!cctx.isLocal()) { @@ -536,6 +480,10 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { nodes, true, 0, false).get(); + + // Separated cache will be destroyed after the set is blocked. + if (separated) + break; } catch (IgniteCheckedException e) { if (e.hasCause(ClusterTopologyCheckedException.class)) { @@ -604,34 +552,6 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } /** - * @param key Set item key. - * @param rmv {@code True} if item was removed. - */ - private void onSetItemUpdated(SetItemKey key, boolean rmv) { - // Items stored in a separate cache don't have identifier. - if (key.setId() == null) - return; - - GridConcurrentHashSet<SetItemKey> set = setDataMap.get(key.setId()); - - if (set == null) { - if (rmv) - return; - - GridConcurrentHashSet<SetItemKey> old = setDataMap.putIfAbsent(key.setId(), - set = new GridConcurrentHashSet<>()); - - if (old != null) - set = old; - } - - if (rmv) - set.remove(key); - else - set.add(key); - } - - /** * @param setId Set ID. */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 982006f..5c7f383 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -47,6 +47,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.cache.CacheEntry; import org.apache.ignite.cache.QueryIndexType; import org.apache.ignite.cache.query.QueryMetrics; import org.apache.ignite.cluster.ClusterNode; @@ -616,7 +617,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte break; case SET: - iter = setIterator(qry); + iter = sharedCacheSetIterator(qry); break; @@ -748,49 +749,29 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param qry Query. * @return Cache set items iterator. */ - private GridCloseableIterator<IgniteBiTuple<K, V>> setIterator(GridCacheQueryAdapter<?> qry) { + private GridCloseableIterator<IgniteBiTuple<K, V>> sharedCacheSetIterator( + GridCacheQueryAdapter<?> qry) throws IgniteCheckedException { final GridSetQueryPredicate filter = (GridSetQueryPredicate)qry.scanFilter(); - filter.init(cctx); - IgniteUuid id = filter.setId(); - Collection<SetItemKey> data = cctx.dataStructures().setData(id); - - if (data == null) - data = Collections.emptyList(); - - final GridIterator<IgniteBiTuple<K, V>> it = F.iterator( - data, - new C1<SetItemKey, IgniteBiTuple<K, V>>() { - @Override public IgniteBiTuple<K, V> apply(SetItemKey e) { - return new IgniteBiTuple<>((K)e.item(), (V)Boolean.TRUE); + GridCacheQueryAdapter<CacheEntry<K, ?>> qry0 = new GridCacheQueryAdapter<>(cctx, + SCAN, + new IgniteBiPredicate<Object, Object>() { + @Override public boolean apply(Object k, Object v) { + return k instanceof SetItemKey && id.equals(((SetItemKey)k).setId()); } }, - true, - new P1<SetItemKey>() { - @Override public boolean apply(SetItemKey e) { - return filter.apply(e, null); + new IgniteClosure<Map.Entry, Object>() { + @Override public Object apply(Map.Entry entry) { + return new IgniteBiTuple<K, V>((K)((SetItemKey)entry.getKey()).item(), (V)Boolean.TRUE); } - }); - - return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { - @Override protected boolean onHasNext() { - return it.hasNext(); - } - - @Override protected IgniteBiTuple<K, V> onNext() { - return it.next(); - } - - @Override protected void onRemove() { - it.remove(); - } + }, + qry.partition(), + false, + true); - @Override protected void onClose() { - // No-op. - } - }; + return scanQueryLocal(qry0, false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java index 2c77f17..92671e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java @@ -1581,7 +1581,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen hdr = (GridCacheSetHeader) cctx.cache().withNoRetries().getAndRemove(new GridCacheSetHeaderKey(name)); if (hdr != null) - cctx.dataStructures().removeSetData(hdr.id()); + cctx.dataStructures().removeSetData(hdr.id(), hdr.separated()); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index ba65d9e..f6a1378 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -45,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.query.CacheQuery; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -158,14 +157,8 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite return cache.sizeAsync(new CachePeekMode[] {}).get() - 1; } - if (ctx.isLocal() || ctx.isReplicated()) { - GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id); - - return set != null ? set.size() : 0; - } - CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), null, false, false); + new GridSetQueryPredicate<>(id, collocated), collocated ? hdrPart : null, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); @@ -192,9 +185,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite @Override public boolean isEmpty() { onAccess(); - GridConcurrentHashSet<SetItemKey> set = ctx.dataStructures().setData(id); - - return (set == null || set.isEmpty()) && size() == 0; + return size() == 0; } /** {@inheritDoc} */ @@ -437,7 +428,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite @SuppressWarnings("unchecked") private WeakReferenceCloseableIterator<T> sharedCacheIterator() throws IgniteCheckedException { CacheQuery qry = new GridCacheQueryAdapter<>(ctx, SET, null, null, - new GridSetQueryPredicate<>(id, collocated), null, false, false); + new GridSetQueryPredicate<>(id, collocated), collocated ? hdrPart : null, false, false); Collection<ClusterNode> nodes = dataNodes(ctx.affinity().affinityTopologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java index 0ed4a97..59f13d9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/GridCacheSetAbstractSelfTest.java @@ -43,6 +43,7 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.U; @@ -129,18 +130,12 @@ public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstr for (int i = 0; i < gridCount(); i++) { IgniteKernal grid = (IgniteKernal)grid(i); - for (IgniteCache cache : grid.caches()) { - CacheDataStructuresManager dsMgr = grid.internalCache(cache.getName()).context().dataStructures(); + for (IgniteInternalCache cache : grid.cachesx(null)) { + CacheDataStructuresManager dsMgr = cache.context().dataStructures(); Map map = GridTestUtils.getFieldValue(dsMgr, "setsMap"); assertEquals("Set not removed [grid=" + i + ", map=" + map + ']', 0, map.size()); - - map = GridTestUtils.getFieldValue(dsMgr, "setDataMap"); - - assertEquals("Set data not removed [grid=" + i + ", cache=" + cache.getName() + ", map=" + map + ']', - 0, - map.size()); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1b3742f4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java index 855b035..dc4e17e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java @@ -206,8 +206,6 @@ public class IgnitePersistentStoreDataStructuresTest extends GridCommonAbstractT * @throws Exception If failed. */ public void testSet() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-5553"); - Ignite ignite = startGrids(4); ignite.active(true);
