ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eeb215f7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eeb215f7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eeb215f7 Branch: refs/heads/ignite-5075 Commit: eeb215f742ee97efe4b96367f1547b6ddd945f7c Parents: fffba0d Author: sboikov <[email protected]> Authored: Thu May 11 11:41:39 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu May 11 11:41:39 2017 +0300 ---------------------------------------------------------------------- .../internal/processors/cache/GridCacheAdapter.java | 2 +- .../processors/cache/IgniteCacheOffheapManager.java | 9 +++++++-- .../cache/IgniteCacheOffheapManagerImpl.java | 16 +++++++++++++--- .../distributed/dht/GridDhtLocalPartition.java | 7 +++++-- .../cache/query/GridCacheQueryManager.java | 4 ++-- .../continuous/CacheContinuousQueryManager.java | 5 ++++- 6 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb215f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 650eefb..e1da75a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -2930,7 +2930,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size())); do { - for (Iterator<CacheDataRow> it = ctx.offheap().iterator(true, true, null); + for (Iterator<CacheDataRow> it = ctx.offheap().iteratorForCache(ctx.cacheId(), true, true, null); it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH; ) keys.add((K)it.next().key()); http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb215f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 4a464d8..5a3e0c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -175,7 +175,10 @@ public interface IgniteCacheOffheapManager { * @return Rows iterator. * @throws IgniteCheckedException If failed. */ - public GridIterator<CacheDataRow> iterator(boolean primary, boolean backup, final AffinityTopologyVersion topVer) + public GridIterator<CacheDataRow> iteratorForCache(int cacheId, + boolean primary, + boolean backup, + final AffinityTopologyVersion topVer) throws IgniteCheckedException; /** @@ -183,7 +186,9 @@ public interface IgniteCacheOffheapManager { * @return Partition data iterator. * @throws IgniteCheckedException If failed. */ - public GridIterator<CacheDataRow> iterator(final int part) throws IgniteCheckedException; + public GridIterator<CacheDataRow> iteratorForCache(int cacheId, final int part) throws IgniteCheckedException; + + public GridIterator<CacheDataRow> partitionIterator(final int part) throws IgniteCheckedException; /** * @param part Partition. http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb215f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index d70b34d..368b86c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -536,9 +536,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public GridIterator<CacheDataRow> iterator(boolean primary, boolean backups, + @Override public GridIterator<CacheDataRow> iteratorForCache( + int cacheId, + boolean primary, + boolean backups, final AffinityTopologyVersion topVer) throws IgniteCheckedException { + // TODO IGNITE-5075. return rowsIterator(primary, backups, topVer); } @@ -601,7 +605,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public GridIterator<CacheDataRow> iterator(int part) throws IgniteCheckedException { + @Override public GridIterator<CacheDataRow> iteratorForCache(int cacheId, int part) throws IgniteCheckedException { + // TODO IGNITE-5075. + return partitionIterator(part); + } + + /** {@inheritDoc} */ + @Override public GridIterator<CacheDataRow> partitionIterator(int part) throws IgniteCheckedException { CacheDataStore data = partitionData(part); if (data == null) @@ -668,7 +678,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public IgniteRebalanceIterator rebalanceIterator(int part, AffinityTopologyVersion topVer, Long partCntr) throws IgniteCheckedException { - final GridIterator<CacheDataRow> it = iterator(part); + final GridIterator<CacheDataRow> it = partitionIterator(part); return new IgniteRebalanceIterator() { @Override public boolean historical() { http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb215f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 53c4939..63df21b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -921,10 +921,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } if (!grp.allowFastEviction()) { - GridCacheContext cctx = grp.cacheContext(); + GridCacheContext cctx = grp.sharedGroup() ? null : grp.cacheContext(); try { - GridIterator<CacheDataRow> it0 = grp.offheap().iterator(id); + GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); while (it0.hasNext()) { ctx.database().checkpointReadLock(); @@ -932,6 +932,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements try { CacheDataRow row = it0.next(); + if (grp.sharedGroup() && (cctx == null || cctx.cacheId() != row.cacheId())) + cctx = ctx.cacheContext(row.cacheId()); + GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx, grp.affinity().lastVersion(), row.key(), http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb215f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index fcf534c..1d1747d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -863,12 +863,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte locPart = locPart0; - it = cctx.offheap().iterator(part); + it = cctx.offheap().iteratorForCache(cctx.cacheId(), part); } else { locPart = null; - it = cctx.offheap().iterator(true, backups, topVer); + it = cctx.offheap().iteratorForCache(cctx.cacheId(), true, backups, topVer); } return new PeekValueExpiryAwareIterator(it, plc, topVer, keyValFilter, qry.keepBinary(), locNode) { http://git-wip-us.apache.org/repos/asf/ignite/blob/eeb215f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index bc703a2..76b4359 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -656,7 +656,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } if (notifyExisting) { - final Iterator<CacheDataRow> it = cctx.offheap().iterator(true, true, AffinityTopologyVersion.NONE); + final Iterator<CacheDataRow> it = cctx.offheap().iteratorForCache(cctx.cacheId(), + true, + true, + AffinityTopologyVersion.NONE); locLsnr.onUpdated(new Iterable<CacheEntryEvent>() { @Override public Iterator<CacheEntryEvent> iterator() {
