http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 099840a..a4e4c24 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collections; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -27,11 +28,9 @@ import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.DataPageEvictionMode; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -51,10 +50,8 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyCloseableIterator; @@ -81,9 +78,20 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; * */ @SuppressWarnings("PublicInnerClass") -public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter implements IgniteCacheOffheapManager { +public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager { + /** */ + private static final int UNDEFINED_CACHE_ID = 0; + + /** */ + protected GridCacheSharedContext ctx; + + /** */ + protected CacheGroupContext grp; + + /** */ + protected IgniteLogger log; + /** */ - // TODO GG-11208 need restore size after restart. private CacheDataStore locCacheDataStore; /** */ @@ -99,9 +107,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple private volatile boolean hasPendingEntries; /** */ - private static final PendingRow START_PENDING_ROW = new PendingRow(Long.MIN_VALUE, 0); - - /** */ private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000); /** */ @@ -116,77 +121,102 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - super.start0(); + @Override public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException { + this.ctx = ctx; + this.grp = grp; + this.log = ctx.logger(getClass()); - updateValSizeThreshold = cctx.shared().database().pageSize() / 2; + updateValSizeThreshold = ctx.database().pageSize() / 2; - if (cctx.affinityNode()) { - cctx.shared().database().checkpointReadLock(); + if (grp.affinityNode()) { + ctx.database().checkpointReadLock(); try { initDataStructures(); - if (cctx.isLocal()) { - assert cctx.cache() instanceof GridLocalCache : cctx.cache(); - + if (grp.isLocal()) locCacheDataStore = createCacheDataStore(0); - } } finally { - cctx.shared().database().checkpointReadUnlock(); + ctx.database().checkpointReadUnlock(); } } } - /** - * @throws IgniteCheckedException If failed. - */ - protected void initDataStructures() throws IgniteCheckedException { - if (cctx.shared().ttl().eagerTtlEnabled()) { + /** {@inheritDoc} */ + public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException{ + if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { String name = "PendingEntries"; long rootPage = allocateForTree(); - pendingEntries = new PendingEntriesTree(cctx, + pendingEntries = new PendingEntriesTree( + grp, name, - cctx.memoryPolicy().pageMemory(), + grp.memoryPolicy().pageMemory(), rootPage, - cctx.reuseList(), + grp.reuseList(), true); } } - /** {@inheritDoc} */ - @Override protected void stop0(final boolean cancel, final boolean destroy) { - super.stop0(cancel, destroy); + /** + * @throws IgniteCheckedException If failed. + */ + protected void initDataStructures() throws IgniteCheckedException { + // No-op. + } - if (destroy && cctx.affinityNode()) - destroyCacheDataStructures(destroy); + /** {@inheritDoc} */ + @Override public void stopCache(int cacheId, final boolean destroy) { + if (destroy && grp.affinityNode()) + destroyCacheDataStructures(cacheId, destroy); } /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { - super.onKernalStop0(cancel); + @Override public void stop() { + try { + for (CacheDataStore store : cacheDataStores()) + store.destroy(); + + if (pendingEntries != null) + pendingEntries.destroy(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e.getMessage(), e); + } + } + /** {@inheritDoc} */ + @Override public void onKernalStop() { busyLock.block(); } /** * */ - protected void destroyCacheDataStructures(boolean destroy) { - assert cctx.affinityNode(); + protected void destroyCacheDataStructures(int cacheId, boolean destroy) { + assert grp.affinityNode(); try { - if (locCacheDataStore != null) - locCacheDataStore.destroy(); + if (grp.sharedGroup()) { + assert cacheId != UNDEFINED_CACHE_ID; - if (pendingEntries != null) - pendingEntries.destroy(); + for (CacheDataStore store : cacheDataStores()) + store.clear(cacheId); - for (CacheDataStore store : partDataStores.values()) - store.destroy(); + if (pendingEntries != null) { + PendingRow row = new PendingRow(cacheId); + + GridCursor<PendingRow> cursor = pendingEntries.find(row, row, PendingEntriesTree.WITHOUT_KEY); + + while (cursor.next()) { + boolean res = pendingEntries.removex(cursor.get()); + + assert res; + } + } + } } catch (IgniteCheckedException e) { throw new IgniteException(e.getMessage(), e); @@ -198,7 +228,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @return Data store for given entry. */ public CacheDataStore dataStore(GridDhtLocalPartition part) { - if (cctx.isLocal()) + if (grp.isLocal()) return locCacheDataStore; else { assert part != null; @@ -208,14 +238,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public long entriesCount() { - if (cctx.isLocal()) - return locCacheDataStore.size(); - + @Override public long cacheEntriesCount(int cacheId) { long size = 0; - for (CacheDataStore store : partDataStores.values()) - size += store.size(); + for (CacheDataStore store : cacheDataStores()) + size += store.cacheSize(cacheId); return size; } @@ -225,59 +252,41 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @return Partition data. */ @Nullable private CacheDataStore partitionData(int p) { - if (cctx.isLocal()) + if (grp.isLocal()) return locCacheDataStore; else { - GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false); + GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false); return part != null ? part.dataStore() : null; } } /** {@inheritDoc} */ - @Override public long entriesCount( + @Override public long cacheEntriesCount( + int cacheId, boolean primary, boolean backup, AffinityTopologyVersion topVer ) throws IgniteCheckedException { - if (cctx.isLocal()) - return entriesCount(0); + if (grp.isLocal()) + return cacheEntriesCount(cacheId, 0); else { - ClusterNode locNode = cctx.localNode(); - long cnt = 0; - for (GridDhtLocalPartition locPart : cctx.topology().currentLocalPartitions()) { - if (primary) { - if (cctx.affinity().primaryByPartition(locNode, locPart.id(), topVer)) { - cnt += locPart.dataStore().size(); - - continue; - } - } + Iterator<CacheDataStore> it = cacheData(primary, backup, topVer); - if (backup) { - if (cctx.affinity().backupByPartition(locNode, locPart.id(), topVer)) - cnt += locPart.dataStore().size(); - } - } + while (it.hasNext()) + cnt += it.next().cacheSize(cacheId); return cnt; } } /** {@inheritDoc} */ - @Override public long entriesCount(int part) { - if (cctx.isLocal()) { - assert part == 0; + @Override public long cacheEntriesCount(int cacheId, int part) { + CacheDataStore store = partitionData(part); - return locCacheDataStore.size(); - } - else { - GridDhtLocalPartition locPart = cctx.topology().localPartition(part, AffinityTopologyVersion.NONE, false); - - return locPart == null ? 0 : locPart.dataStore().size(); - } + return store == null ? 0 : store.cacheSize(cacheId); } /** @@ -289,10 +298,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple private Iterator<CacheDataStore> cacheData(boolean primary, boolean backup, AffinityTopologyVersion topVer) { assert primary || backup; - if (cctx.isLocal()) - return Collections.singleton(locCacheDataStore).iterator(); + if (grp.isLocal()) + return singletonIterator(locCacheDataStore); else { - final Iterator<GridDhtLocalPartition> it = cctx.topology().currentLocalPartitions().iterator(); + final Iterator<GridDhtLocalPartition> it = grp.topology().currentLocalPartitions().iterator(); if (primary && backup) { return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() { @@ -302,8 +311,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple }, true); } - final Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) : - cctx.affinity().backupPartitions(cctx.localNodeId(), topVer); + final Set<Integer> parts = primary ? grp.affinity().primaryPartitions(ctx.localNodeId(), topVer) : + grp.affinity().backupPartitions(ctx.localNodeId(), topVer); return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() { @Override public CacheDataStore apply(GridDhtLocalPartition part) { @@ -319,15 +328,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public void invoke(KeyCacheObject key, + @Override public void invoke( + GridCacheContext cctx, + KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c) throws IgniteCheckedException { - dataStore(part).invoke(key, c); + dataStore(part).invoke(cctx, key, c); } /** {@inheritDoc} */ @Override public void update( + GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, @@ -338,16 +350,17 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple ) throws IgniteCheckedException { assert expireTime >= 0; - dataStore(part).update(key, partId, val, ver, expireTime, oldRow); + dataStore(part).update(cctx, key, partId, val, ver, expireTime, oldRow); } /** {@inheritDoc} */ @Override public void remove( + GridCacheContext cctx, KeyCacheObject key, int partId, GridDhtLocalPartition part ) throws IgniteCheckedException { - dataStore(part).remove(key, partId); + dataStore(part).remove(cctx, key, partId); } /** {@inheritDoc} */ @@ -356,21 +369,21 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple throws IgniteCheckedException { KeyCacheObject key = entry.key(); - assert cctx.isLocal() || entry.localPartition() != null : entry; + assert grp.isLocal() || entry.localPartition() != null : entry; - return dataStore(entry.localPartition()).find(key); + return dataStore(entry.localPartition()).find(entry.context(), key); } /** {@inheritDoc} */ - @Nullable @Override public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException { + @Nullable @Override public CacheDataRow read(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { CacheDataRow row; if (cctx.isLocal()) - row = locCacheDataStore.find(key); + row = locCacheDataStore.find(cctx, key); else { GridDhtLocalPartition part = cctx.topology().localPartition(cctx.affinity().partition(key), null, false); - row = part != null ? dataStore(part).find(key) : null; + row = part != null ? dataStore(part).find(cctx, key) : null; } assert row == null || row.value() != null : row; @@ -411,17 +424,17 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @param readers {@code True} to clear readers. */ @SuppressWarnings("unchecked") - @Override public void clear(boolean readers) { + @Override public void clearCache(GridCacheContext cctx, boolean readers) { GridCacheVersion obsoleteVer = null; - GridIterator<CacheDataRow> it = rowsIterator(true, true, null); + GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator()); while (it.hasNext()) { KeyCacheObject key = it.next().key(); try { if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); + obsoleteVer = ctx.versions().next(); GridCacheEntryEx entry = cctx.cache().entryEx(key); @@ -456,11 +469,13 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") - @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(final boolean primary, + @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator( + final GridCacheContext cctx, + final boolean primary, final boolean backup, final AffinityTopologyVersion topVer, final boolean keepBinary) throws IgniteCheckedException { - final Iterator<CacheDataRow> it = rowsIterator(primary, backup, topVer); + final Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup, topVer); return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() { /** */ @@ -501,13 +516,14 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public GridCloseableIterator<KeyCacheObject> keysIterator(final int part) throws IgniteCheckedException { + @Override public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, final int part) throws IgniteCheckedException { CacheDataStore data = partitionData(part); if (data == null) return new GridEmptyCloseableIterator<>(); - final GridCursor<? extends CacheDataRow> cur = data.cursor(); + final GridCursor<? extends CacheDataRow> cur = + data.cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY); return new GridCloseableIteratorAdapter<KeyCacheObject>() { /** */ @@ -537,21 +553,41 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public GridIterator<CacheDataRow> iterator(boolean primary, boolean backups, + @Override public GridIterator<CacheDataRow> cacheIterator( + int cacheId, + boolean primary, + boolean backups, final AffinityTopologyVersion topVer) throws IgniteCheckedException { - return rowsIterator(primary, backups, topVer); + return iterator(cacheId, cacheData(primary, backups, topVer)); + } + + /** {@inheritDoc} */ + @Override public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, int part) throws IgniteCheckedException { + CacheDataStore data = partitionData(part); + + if (data == null) + return new GridEmptyCloseableIterator<>(); + + return iterator(cacheId, singletonIterator(data)); + } + + /** {@inheritDoc} */ + @Override public GridIterator<CacheDataRow> partitionIterator(int part) throws IgniteCheckedException { + CacheDataStore data = partitionData(part); + + if (data == null) + return new GridEmptyCloseableIterator<>(); + + return iterator(UNDEFINED_CACHE_ID, singletonIterator(data)); } /** - * @param primary Primary entries flag. - * @param backups Backup entries flag. - * @param topVer Topology version. - * @return Iterator. + * @param cacheId Cache ID. + * @param dataIt Data store iterator. + * @return Rows iterator */ - private GridIterator<CacheDataRow> rowsIterator(boolean primary, boolean backups, AffinityTopologyVersion topVer) { - final Iterator<CacheDataStore> dataIt = cacheData(primary, backups, topVer); - + private GridIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore> dataIt) { return new GridCloseableIteratorAdapter<CacheDataRow>() { /** */ private GridCursor<? extends CacheDataRow> cur; @@ -580,7 +616,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple CacheDataStore ds = dataIt.next(); curPart = ds.partId(); - cur = ds.cursor(); + cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); } else break; @@ -601,35 +637,34 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple }; } - /** {@inheritDoc} */ - @Override public GridIterator<CacheDataRow> iterator(int part) throws IgniteCheckedException { - CacheDataStore data = partitionData(part); - - if (data == null) - return new GridEmptyCloseableIterator<>(); - - final GridCursor<? extends CacheDataRow> cur = data.cursor(); - - return new GridCloseableIteratorAdapter<CacheDataRow>() { + /** + * @param item Item. + * @return Single item iterator. + */ + private <T> Iterator<T> singletonIterator(final T item) { + return new Iterator<T>() { /** */ - private CacheDataRow next; + private boolean hasNext = true; - @Override protected CacheDataRow onNext() { - CacheDataRow res = next; - - next = null; - - return res; + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return hasNext; } - @Override protected boolean onHasNext() throws IgniteCheckedException { - if (next != null) - return true; + /** {@inheritDoc} */ + @Override public T next() { + if (hasNext) { + hasNext = false; - if (cur.next()) - next = cur.get(); + return item; + } - return next != null; + throw new NoSuchElementException(); + } + + /** {@inheritDoc} */ + @Override public void remove() { + throw new UnsupportedOperationException(); } }; } @@ -639,12 +674,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @throws IgniteCheckedException If failed. */ private long allocateForTree() throws IgniteCheckedException { - ReuseList reuseList = cctx.reuseList(); + ReuseList reuseList = grp.reuseList(); long pageId; if (reuseList == null || (pageId = reuseList.takeRecycledPage()) == 0L) - pageId = cctx.memoryPolicy().pageMemory().allocatePage(cctx.cacheId(), INDEX_PARTITION, FLAG_IDX); + pageId = grp.memoryPolicy().pageMemory().allocatePage(grp.groupId(), INDEX_PARTITION, FLAG_IDX); return pageId; } @@ -653,7 +688,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple @Override public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException { long pageId = allocateForTree(); - return new RootPage(new FullPageId(pageId, cctx.cacheId()), true); + return new RootPage(new FullPageId(pageId, grp.groupId()), true); } /** {@inheritDoc} */ @@ -663,13 +698,13 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override public ReuseList reuseListForIndex(String idxName) { - return cctx.reuseList(); + return grp.reuseList(); } /** {@inheritDoc} */ @Override public IgniteRebalanceIterator rebalanceIterator(int part, AffinityTopologyVersion topVer, Long partCntr) throws IgniteCheckedException { - final GridIterator<CacheDataRow> it = iterator(part); + final GridIterator<CacheDataRow> it = partitionIterator(part); return new IgniteRebalanceIterator() { @Override public boolean historical() { @@ -738,14 +773,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple throws IgniteCheckedException { final long rootPage = allocateForTree(); - CacheDataRowStore rowStore = new CacheDataRowStore(cctx, cctx.freeList(), p); + CacheDataRowStore rowStore = new CacheDataRowStore(grp, grp.freeList(), p); String idxName = treeName(p); - CacheDataTree dataTree = new CacheDataTree(idxName, - cctx.reuseList(), + CacheDataTree dataTree = new CacheDataTree( + grp, + idxName, + grp.reuseList(), rowStore, - cctx, rootPage, true); @@ -754,7 +790,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override public Iterable<CacheDataStore> cacheDataStores() { - if (cctx.isLocal()) + if (grp.isLocal()) return Collections.singleton(locCacheDataStore); return new Iterable<CacheDataStore>() { @@ -786,15 +822,23 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override public boolean expire( + GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount ) throws IgniteCheckedException { + assert !cctx.isNear() : cctx.name(); + if (hasPendingEntries && pendingEntries != null) { GridCacheVersion obsoleteVer = null; long now = U.currentTimeMillis(); - GridCursor<PendingRow> cur = pendingEntries.find(START_PENDING_ROW, new PendingRow(now, 0)); + GridCursor<PendingRow> cur; + + if (grp.sharedGroup()) + cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0)); + else + cur = pendingEntries.find(null, new PendingRow(UNDEFINED_CACHE_ID, now, 0)); int cleared = 0; @@ -809,9 +853,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple assert row.key != null && row.link != 0 && row.expireTime != 0 : row; - if (pendingEntries.remove(row) != null) { + if (pendingEntries.removex(row)) { if (obsoleteVer == null) - obsoleteVer = cctx.versions().next(); + obsoleteVer = ctx.versions().next(); c.apply(cctx.cache().entryEx(row.key), obsoleteVer); } @@ -848,7 +892,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple protected final AtomicLong cntr = new AtomicLong(); /** Partition size. */ - protected final AtomicLong storageSize = new AtomicLong(); + private final AtomicLong storageSize = new AtomicLong(); + + /** */ + private final ConcurrentMap<Integer, AtomicLong> cacheSizes = new ConcurrentHashMap<>(); /** Initialized update counter. */ protected Long initCntr = 0L; @@ -871,13 +918,60 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple this.dataTree = dataTree; } + /** + * @param cacheId Cache ID. + */ + void incrementSize(int cacheId) { + storageSize.incrementAndGet(); + + if (grp.sharedGroup()) { + AtomicLong size = cacheSizes.get(cacheId); + + if (size == null) { + AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = new AtomicLong()); + + if (old != null) + size = old; + } + + size.incrementAndGet(); + } + } + + /** + * @param cacheId Cache ID. + */ + void decrementSize(int cacheId) { + storageSize.decrementAndGet(); + + if (grp.sharedGroup()) { + AtomicLong size = cacheSizes.get(cacheId); + + if (size == null) + return; + + size.decrementAndGet(); + } + } + /** {@inheritDoc} */ @Override public int partId() { return partId; } /** {@inheritDoc} */ - @Override public int size() { + @Override public int cacheSize(int cacheId) { + if (grp.sharedGroup()) { + AtomicLong size = cacheSizes.get(cacheId); + + return size != null ? (int)size.get() : 0; + } + + return (int)storageSize.get(); + } + + /** {@inheritDoc} */ + @Override public int fullSize() { return (int)storageSize.get(); } @@ -907,12 +1001,13 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** + * @param cctx Cache context. * @param oldRow Old row. * @param dataRow New row. * @return {@code True} if it is possible to update old row data. * @throws IgniteCheckedException If failed. */ - private boolean canUpdateOldRow(@Nullable CacheDataRow oldRow, DataRow dataRow) + private boolean canUpdateOldRow(GridCacheContext cctx, @Nullable CacheDataRow oldRow, DataRow dataRow) throws IgniteCheckedException { if (oldRow == null || cctx.queries().enabled()) return false; @@ -933,13 +1028,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c) + @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - dataTree.invoke(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY, c); + int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID; + + dataTree.invoke(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY, c); switch (c.operationType()) { case PUT: { @@ -947,7 +1044,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple CacheDataRow oldRow = c.oldRow(); - finishUpdate(c.newRow(), oldRow); + finishUpdate(cctx, c.newRow(), oldRow); break; } @@ -955,7 +1052,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple case REMOVE: { CacheDataRow oldRow = c.oldRow(); - finishRemove(key, oldRow); + finishRemove(cctx, key, oldRow); break; } @@ -973,18 +1070,19 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public CacheDataRow createRow(KeyCacheObject key, + @Override public CacheDataRow createRow( + GridCacheContext cctx, + KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { - int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ? - 0 : cctx.cacheId(); + int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : UNDEFINED_CACHE_ID; DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId); - if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) + if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) dataRow.link(oldRow.link()); else { CacheObjectContext coCtx = cctx.cacheObjectContext(); @@ -997,11 +1095,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple assert dataRow.link() != 0 : dataRow; + if (grp.sharedGroup() && dataRow.cacheId() == UNDEFINED_CACHE_ID) + dataRow.cacheId(cctx.cacheId()); + return dataRow; } /** {@inheritDoc} */ - @Override public void update(KeyCacheObject key, + @Override public void update( + GridCacheContext cctx, + KeyCacheObject key, int p, CacheObject val, GridCacheVersion ver, @@ -1013,8 +1116,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED ? - cctx.cacheId() : 0; + int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : UNDEFINED_CACHE_ID; + + assert oldRow == null || oldRow.cacheId() == cacheId : oldRow; DataRow dataRow = new DataRow(key, val, ver, p, expireTime, cacheId); @@ -1026,7 +1130,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple CacheDataRow old; - if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) { + if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) { old = oldRow; dataRow.link(oldRow.link()); @@ -1036,6 +1140,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple assert dataRow.link() != 0 : dataRow; + if (grp.sharedGroup() && dataRow.cacheId() == UNDEFINED_CACHE_ID) + dataRow.cacheId(cctx.cacheId()); + if (oldRow != null) { old = oldRow; @@ -1045,7 +1152,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple old = dataTree.put(dataRow); } - finishUpdate(dataRow, old); + finishUpdate(cctx, dataRow, old); } finally { busyLock.leaveBusy(); @@ -1053,13 +1160,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** + * @param cctx Cache context. * @param newRow New row. * @param oldRow Old row if available. * @throws IgniteCheckedException If failed. */ - private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow) + throws IgniteCheckedException { if (oldRow == null) - storageSize.incrementAndGet(); + incrementSize(cctx.cacheId()); KeyCacheObject key = newRow.key(); @@ -1067,12 +1176,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple GridCacheQueryManager qryMgr = cctx.queries(); + int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID; + if (qryMgr.enabled()) { if (oldRow != null) { qryMgr.store(key, partId, - oldRow.value(), oldRow.version(), - newRow.value(), newRow.version(), + oldRow.value(), + oldRow.version(), + newRow.value(), + newRow.version(), expireTime, newRow.link()); } @@ -1080,7 +1193,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple qryMgr.store(key, partId, null, null, - newRow.value(), newRow.version(), + newRow.value(), + newRow.version(), expireTime, newRow.link()); } @@ -1090,30 +1204,32 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple assert oldRow.link() != 0 : oldRow; if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link())); + pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); if (newRow.link() != oldRow.link()) rowStore.removeRow(oldRow.link()); } if (pendingEntries != null && expireTime != 0) { - pendingEntries.putx(new PendingRow(expireTime, newRow.link())); + pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link())); hasPendingEntries = true; } - updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), newRow.value()); + updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value()); } /** {@inheritDoc} */ - @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException { + @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { if (!busyLock.enterBusy()) throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - CacheDataRow oldRow = dataTree.remove(new SearchRow(key)); + int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID; - finishRemove(key, oldRow); + CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key)); + + finishRemove(cctx, key, oldRow); } finally { busyLock.leaveBusy(); @@ -1121,21 +1237,26 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** + * @param cctx Cache context. * @param key Key. * @param oldRow Removed row. * @throws IgniteCheckedException If failed. */ - private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { CacheObject val = null; GridCacheVersion ver = null; if (oldRow != null) { + int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID; + assert oldRow.link() != 0 : oldRow; + assert cacheId == UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId : + "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "]."; if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link())); + pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); - storageSize.decrementAndGet(); + decrementSize(cctx.cacheId()); val = oldRow.value(); @@ -1150,19 +1271,21 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (oldRow != null) rowStore.removeRow(oldRow.link()); - updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), null); + updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null); } /** {@inheritDoc} */ - @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException { + @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { key.valueBytes(cctx.cacheObjectContext()); - CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY); + int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID; + + CacheDataRow row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); if (row != null) { row.key(key); - cctx.memoryPolicy().evictionTracker().touchPage(row.link()); + grp.memoryPolicy().evictionTracker().touchPage(row.link()); } return row; @@ -1173,19 +1296,36 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple return dataTree.find(null, null); } + /** {@inheritDoc} + * @param cacheId*/ + @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { + return cursor(cacheId, null, null); + } + /** {@inheritDoc} */ - @Override public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower, + @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower, KeyCacheObject upper) throws IgniteCheckedException { - SearchRow lowerRow = null; - SearchRow upperRow = null; + return cursor(cacheId, lower, upper, null); + } - if (lower != null) - lowerRow = new SearchRow(lower); + /** {@inheritDoc} */ + @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower, + KeyCacheObject upper, Object x) throws IgniteCheckedException { + SearchRow lowerRow; + SearchRow upperRow; - if (upper != null) - upperRow = new SearchRow(upper); + if (grp.sharedGroup()) { + assert cacheId != UNDEFINED_CACHE_ID; - return dataTree.find(lowerRow, upperRow); + lowerRow = lower != null ? new SearchRow(cacheId, lower) : new SearchRow(cacheId); + upperRow = upper != null ? new SearchRow(cacheId, upper) : new SearchRow(cacheId); + } + else { + lowerRow = lower != null ? new SearchRow(UNDEFINED_CACHE_ID, lower) : null; + upperRow = upper != null ? new SearchRow(UNDEFINED_CACHE_ID, upper) : null; + } + + return dataTree.find(lowerRow, upperRow, x); } /** {@inheritDoc} */ @@ -1215,6 +1355,46 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ + @Override public void clear(int cacheId) throws IgniteCheckedException { + assert cacheId != UNDEFINED_CACHE_ID; + + if (cacheSize(cacheId) == 0) + return; + + Exception ex = null; + + GridCursor<? extends CacheDataRow> cur = + cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY); + + while (cur.next()) { + CacheDataRow row = cur.get(); + + assert row.link() != 0 : row; + + try { + boolean res = dataTree.removex(row); + + assert res : row; + + rowStore.removeRow(row.link()); + + decrementSize(cacheId); + } + catch (IgniteCheckedException e) { + U.error(log, "Fail remove row [link=" + row.link() + "]"); + + if (ex == null) + ex = e; + else + ex.addSuppressed(e); + } + } + + if (ex != null) + throw new IgniteCheckedException("Fail destroy store", ex); + } + + /** {@inheritDoc} */ @Override public RowStore rowStore() { return rowStore; } @@ -1247,11 +1427,14 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** + * @param cctx Cache context. * @param key Key. * @param oldVal Old value. * @param newVal New value. + * @throws IgniteCheckedException If failed. */ private void updateIgfsMetrics( + GridCacheContext cctx, KeyCacheObject key, CacheObject oldVal, CacheObject newVal @@ -1259,11 +1442,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple // In case we deal with IGFS cache, count updated data if (cctx.cache().isIgfsDataCache() && !cctx.isNear() && - cctx.kernalContext() + ctx.kernalContext() .igfsHelper() .isIgfsBlockKey(key.value(cctx.cacheObjectContext(), false))) { - int oldSize = valueLength(oldVal); - int newSize = valueLength(newVal); + int oldSize = valueLength(cctx, oldVal); + int newSize = valueLength(cctx, newVal); int delta = newSize - oldSize; @@ -1275,10 +1458,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** * Isolated method to get length of IGFS block. * + * @param cctx Cache context. * @param val Value. * @return Length of value. */ - private int valueLength(@Nullable CacheObject val) { + private int valueLength(GridCacheContext cctx, @Nullable CacheObject val) { if (val == null) return 0; @@ -1301,13 +1485,28 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** */ private final int hash; + /** */ + private final int cacheId; + /** + * @param cacheId Cache ID. * @param key Key. */ - SearchRow(KeyCacheObject key) { + SearchRow(int cacheId, KeyCacheObject key) { this.key = key; + this.hash = key.hashCode(); + this.cacheId = cacheId; + } - hash = key.hashCode(); + /** + * Instantiates a new fake search row as a logic cache based bound. + * + * @param cacheId Cache ID. + */ + SearchRow(int cacheId) { + this.key = null; + this.hash = 0; + this.cacheId = cacheId; } /** {@inheritDoc} */ @@ -1324,6 +1523,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple @Override public int hash() { return hash; } + + /** {@inheritDoc} */ + @Override public int cacheId() { + return cacheId; + } } /** @@ -1339,6 +1543,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** * @param hash Hash code. * @param link Link. + * @param part Partition. * @param rowData Required row data. */ DataRow(int hash, long link, int part, CacheDataRowAdapter.RowData rowData) { @@ -1350,7 +1555,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple try { // We can not init data row lazily because underlying buffer can be concurrently cleared. - initFromLink(cctx, rowData); + initFromLink(grp, rowData); } catch (IgniteCheckedException e) { throw new IgniteException(e); @@ -1366,6 +1571,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @param ver Version. * @param part Partition. * @param expireTime Expire time. + * @param cacheId Cache ID. */ DataRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, int part, long expireTime, int cacheId) { super(0); @@ -1393,6 +1599,13 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple @Override public void link(long link) { this.link = link; } + + /** + * @param cacheId Cache ID. + */ + void cacheId(int cacheId) { + this.cacheId = cacheId; + } } /** @@ -1403,54 +1616,80 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple private final CacheDataRowStore rowStore; /** */ - private final GridCacheContext cctx; + private final CacheGroupContext grp; /** + * @param grp Ccahe group. * @param name Tree name. * @param reuseList Reuse list. * @param rowStore Row store. - * @param cctx Context. * @param metaPageId Meta page ID. * @param initNew Initialize new index. * @throws IgniteCheckedException If failed. */ public CacheDataTree( + CacheGroupContext grp, String name, ReuseList reuseList, CacheDataRowStore rowStore, - GridCacheContext cctx, long metaPageId, boolean initNew ) throws IgniteCheckedException { super(name, - cctx.cacheId(), - cctx.memoryPolicy().pageMemory(), - cctx.shared().wal(), - cctx.offheap().globalRemoveId(), + grp.groupId(), + grp.memoryPolicy().pageMemory(), + grp.shared().wal(), + grp.offheap().globalRemoveId(), metaPageId, reuseList, - DataInnerIO.VERSIONS, - DataLeafIO.VERSIONS); + grp.sharedGroup() ? CacheIdAwareDataInnerIO.VERSIONS : DataInnerIO.VERSIONS, + grp.sharedGroup() ? CacheIdAwareDataLeafIO.VERSIONS : DataLeafIO.VERSIONS); assert rowStore != null; this.rowStore = rowStore; - this.cctx = cctx; + this.grp = grp; initTree(initNew); } /** {@inheritDoc} */ - @Override protected int compare(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, CacheSearchRow row) + @Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row) throws IgniteCheckedException { - int hash = ((RowLinkIO)io).getHash(pageAddr, idx); + RowLinkIO io = (RowLinkIO)iox; + + int cmp; + + if (grp.sharedGroup()) { + assert row.cacheId() != UNDEFINED_CACHE_ID : "Cache ID is not provided: " + row; + + int cacheId = io.getCacheId(pageAddr, idx); + + assert cacheId != UNDEFINED_CACHE_ID : "Cache ID is not stored"; + + cmp = Integer.compare(cacheId, row.cacheId()); + + if (cmp != 0) + return cmp; + + if (row.key() == null) { + assert row.getClass() == SearchRow.class : row; + + // A search row with a cache ID only is used as a cache bound. + // The found position will be shifted until the exact cache bound is found; + // See for details: + // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findLowerBound() + // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findUpperBound() + return cmp; + } + } - int cmp = Integer.compare(hash, row.hash()); + cmp = Integer.compare(io.getHash(pageAddr, idx), row.hash()); if (cmp != 0) return cmp; - long link = ((RowLinkIO)io).getLink(pageAddr, idx); + long link = io.getLink(pageAddr, idx); assert row.key() != null : row; @@ -1460,14 +1699,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags) throws IgniteCheckedException { - int hash = ((RowLinkIO)io).getHash(pageAddr, idx); long link = ((RowLinkIO)io).getLink(pageAddr, idx); + int hash = ((RowLinkIO)io).getHash(pageAddr, idx); + int cacheId = ((RowLinkIO)io).getCacheId(pageAddr, idx); CacheDataRowAdapter.RowData x = flags != null ? (CacheDataRowAdapter.RowData)flags : CacheDataRowAdapter.RowData.FULL; - return rowStore.dataRow(hash, link, x); + return rowStore.dataRow(cacheId, hash, link, x); } /** @@ -1477,7 +1717,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @throws IgniteCheckedException If failed. */ private int compareKeys(KeyCacheObject key, final long link) throws IgniteCheckedException { - byte[] bytes = key.valueBytes(cctx.cacheObjectContext()); + byte[] bytes = key.valueBytes(grp.cacheObjectContext()); final long pageId = pageId(link); final long page = acquirePage(pageId); @@ -1496,7 +1736,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (data.nextLink() == 0) { long addr = pageAddr + data.offset(); - if (cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED) + if (grp.storeCacheIdInDataPage()) addr += 4; // Skip cache id. final int len = PageUtils.getInt(addr, 0); @@ -1543,10 +1783,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple // TODO GG-11768. CacheDataRowAdapter other = new CacheDataRowAdapter(link); - other.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY); + other.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY); - byte[] bytes1 = other.key().valueBytes(cctx.cacheObjectContext()); - byte[] bytes2 = key.valueBytes(cctx.cacheObjectContext()); + byte[] bytes1 = other.key().valueBytes(grp.cacheObjectContext()); + byte[] bytes2 = key.valueBytes(grp.cacheObjectContext()); int lenCmp = Integer.compare(bytes1.length, bytes2.length); @@ -1588,44 +1828,46 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple private final int partId; /** - * @param cctx Cache context. + * @param grp Cache group. * @param freeList Free list. + * @param partId Partition number. */ - public CacheDataRowStore(GridCacheContext<?, ?> cctx, FreeList freeList, int partId) { - super(cctx, freeList); + public CacheDataRowStore(CacheGroupContext grp, FreeList freeList, int partId) { + super(grp, freeList); this.partId = partId; } /** + * @param cacheId Cache ID. * @param hash Hash code. * @param link Link. * @return Search row. */ - private CacheSearchRow keySearchRow(int hash, long link) { - return new DataRow(hash, link, partId, CacheDataRowAdapter.RowData.KEY_ONLY); + private CacheSearchRow keySearchRow(int cacheId, int hash, long link) { + DataRow dataRow = new DataRow(hash, link, partId, CacheDataRowAdapter.RowData.KEY_ONLY); + + if (dataRow.cacheId() == UNDEFINED_CACHE_ID && grp.sharedGroup()) + dataRow.cacheId(cacheId); + + return dataRow; } /** + * @param cacheId Cache ID. * @param hash Hash code. * @param link Link. * @param rowData Required row data. * @return Data row. */ - private CacheDataRow dataRow(int hash, long link, CacheDataRowAdapter.RowData rowData) { - return new DataRow(hash, link, partId, rowData); - } - } + private CacheDataRow dataRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData) { + DataRow dataRow = new DataRow(hash, link, partId, rowData); - /** - * @param pageAddr Page address. - * @param off Offset. - * @param link Link. - * @param hash Hash. - */ - private static void store0(long pageAddr, int off, long link, int hash) { - PageUtils.putLong(pageAddr, off, link); - PageUtils.putInt(pageAddr, off + 8, hash); + if (dataRow.cacheId() == UNDEFINED_CACHE_ID && grp.sharedGroup()) + dataRow.cacheId(cacheId); + + return dataRow; + } } /** @@ -1645,37 +1887,50 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @return Key hash code. */ public int getHash(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Cache ID or {@code 0} if cache ID is not defined. + */ + public int getCacheId(long pageAddr, int idx); } /** * */ - public static final class DataInnerIO extends BPlusInnerIO<CacheSearchRow> implements RowLinkIO { - /** */ - public static final IOVersions<DataInnerIO> VERSIONS = new IOVersions<>( - new DataInnerIO(1) - ); - + private static abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> implements RowLinkIO { /** + * @param type Page type. * @param ver Page format version. + * @param canGetRow If we can get full row from this page. + * @param itemSize Single item size on page. */ - DataInnerIO(int ver) { - super(T_DATA_REF_INNER, ver, true, 12); + protected AbstractDataInnerIO(int type, int ver, boolean canGetRow, int itemSize) { + super(type, ver, canGetRow, itemSize); } /** {@inheritDoc} */ @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) { assert row.link() != 0; - store0(pageAddr, off, row.link(), row.hash()); + PageUtils.putLong(pageAddr, off, row.link()); + PageUtils.putInt(pageAddr, off + 8, row.hash()); + + if (storeCacheId()) { + assert row.cacheId() != UNDEFINED_CACHE_ID : row; + + PageUtils.putInt(pageAddr, off + 12, row.cacheId()); + } } /** {@inheritDoc} */ @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long pageAddr, int idx) { + int cacheId = getCacheId(pageAddr, idx); int hash = getHash(pageAddr, idx); long link = getLink(pageAddr, idx); - return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link); + return ((CacheDataTree)tree).rowStore.keySearchRow(cacheId, hash, link); } /** {@inheritDoc} */ @@ -1683,8 +1938,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple int srcIdx) { int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx); long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx); + int off = offset(dstIdx); + + PageUtils.putLong(dstPageAddr, off, link); + PageUtils.putInt(dstPageAddr, off + 8, hash); + + if (storeCacheId()) { + int cacheId = ((RowLinkIO)srcIo).getCacheId(srcPageAddr, srcIdx); + + assert cacheId != UNDEFINED_CACHE_ID; - store0(dstPageAddr, offset(dstIdx), link, hash); + PageUtils.putInt(dstPageAddr, off + 12, cacheId); + } } /** {@inheritDoc} */ @@ -1706,43 +1971,66 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple for (int i = 0; i < cnt; i++) c.apply(new CacheDataRowAdapter(getLink(pageAddr, i))); } + + /** + * @return {@code True} if cache ID has to be stored. + */ + protected abstract boolean storeCacheId(); } /** * */ - public static final class DataLeafIO extends BPlusLeafIO<CacheSearchRow> implements RowLinkIO { - /** */ - public static final IOVersions<DataLeafIO> VERSIONS = new IOVersions<>( - new DataLeafIO(1) - ); - + private static abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> implements RowLinkIO { /** + * @param type Page type. * @param ver Page format version. + * @param itemSize Single item size on page. */ - DataLeafIO(int ver) { - super(T_DATA_REF_LEAF, ver, 12); + protected AbstractDataLeafIO(int type, int ver, int itemSize) { + super(type, ver, itemSize); } /** {@inheritDoc} */ @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) { assert row.link() != 0; - store0(pageAddr, off, row.link(), row.hash()); + PageUtils.putLong(pageAddr, off, row.link()); + PageUtils.putInt(pageAddr, off + 8, row.hash()); + + if (storeCacheId()) { + assert row.cacheId() != UNDEFINED_CACHE_ID; + + PageUtils.putInt(pageAddr, off + 12, row.cacheId()); + } } /** {@inheritDoc} */ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr, int srcIdx) { - store0(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx), getHash(srcPageAddr, srcIdx)); + int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx); + long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx); + int off = offset(dstIdx); + + PageUtils.putLong(dstPageAddr, off, link); + PageUtils.putInt(dstPageAddr, off + 8, hash); + + if (storeCacheId()) { + int cacheId = ((RowLinkIO)srcIo).getCacheId(srcPageAddr, srcIdx); + + assert cacheId != UNDEFINED_CACHE_ID; + + PageUtils.putInt(dstPageAddr, off + 12, cacheId); + } } /** {@inheritDoc} */ @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long buf, int idx) { + int cacheId = getCacheId(buf, idx); int hash = getHash(buf, idx); long link = getLink(buf, idx); - return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link); + return ((CacheDataTree)tree).rowStore.keySearchRow(cacheId, hash, link); } /** {@inheritDoc} */ @@ -1764,6 +2052,119 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple for (int i = 0; i < cnt; i++) c.apply(new CacheDataRowAdapter(getLink(pageAddr, i))); } + + /** + * @return {@code True} if cache ID has to be stored. + */ + protected abstract boolean storeCacheId(); + } + + /** + * + */ + public static final class DataInnerIO extends AbstractDataInnerIO { + /** */ + public static final IOVersions<DataInnerIO> VERSIONS = new IOVersions<>( + new DataInnerIO(1) + ); + + /** + * @param ver Page format version. + */ + DataInnerIO(int ver) { + super(T_DATA_REF_INNER, ver, true, 12); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return UNDEFINED_CACHE_ID; + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return false; + } + } + + /** + * + */ + public static final class DataLeafIO extends AbstractDataLeafIO { + /** */ + public static final IOVersions<DataLeafIO> VERSIONS = new IOVersions<>( + new DataLeafIO(1) + ); + + /** + * @param ver Page format version. + */ + DataLeafIO(int ver) { + super(T_DATA_REF_LEAF, ver, 12); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return UNDEFINED_CACHE_ID; + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return false; + } + } + + /** + * + */ + public static final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO { + /** */ + public static final IOVersions<CacheIdAwareDataInnerIO> VERSIONS = new IOVersions<>( + new CacheIdAwareDataInnerIO(1) + ); + + /** + * @param ver Page format version. + */ + CacheIdAwareDataInnerIO(int ver) { + super(T_CACHE_ID_AWARE_DATA_REF_INNER, ver, true, 16); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 12); + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return true; + } + } + + /** + * + */ + public static final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO { + /** */ + public static final IOVersions<CacheIdAwareDataLeafIO> VERSIONS = new IOVersions<>( + new CacheIdAwareDataLeafIO(1) + ); + + /** + * @param ver Page format version. + */ + CacheIdAwareDataLeafIO(int ver) { + super(T_CACHE_ID_AWARE_DATA_REF_LEAF, ver, 16); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 12); + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return true; + } } /** @@ -1776,38 +2177,47 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** Link. */ private long link; + /** Cache ID. */ + private int cacheId; + /** */ private KeyCacheObject key; /** + * Creates a new instance which represents an upper or lower bound + * inside a logical cache. + * + * @param cacheId Cache ID. + */ + public PendingRow(int cacheId) { + this.cacheId = cacheId; + } + + /** + * @param cacheId Cache ID. * @param expireTime Expire time. * @param link Link */ - PendingRow(long expireTime, long link) { + PendingRow(int cacheId, long expireTime, long link) { assert expireTime != 0; + this.cacheId = cacheId; this.expireTime = expireTime; this.link = link; } /** - * @param cctx Context. - * @param expireTime Expire time. - * @param link Link. + * @param grp Cache group. * @return Row. * @throws IgniteCheckedException If failed. */ - static PendingRow createRowWithKey(GridCacheContext cctx, long expireTime, long link) - throws IgniteCheckedException { - PendingRow row = new PendingRow(expireTime, link); - + PendingRow initKey(CacheGroupContext grp) throws IgniteCheckedException { CacheDataRowAdapter rowData = new CacheDataRowAdapter(link); + rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY); - rowData.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY); + key = rowData.key(); - row.key = rowData.key(); - - return row; + return this; } /** {@inheritDoc} */ @@ -1821,10 +2231,13 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple */ protected static class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow> { /** */ - private final GridCacheContext cctx; + private final static Object WITHOUT_KEY = new Object(); + + /** */ + private final CacheGroupContext grp; /** - * @param cctx Cache context. + * @param grp Cache group. * @param name Tree name. * @param pageMem Page memory. * @param metaPageId Meta page ID. @@ -1833,7 +2246,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @throws IgniteCheckedException If failed. */ public PendingEntriesTree( - GridCacheContext cctx, + CacheGroupContext grp, String name, PageMemory pageMem, long metaPageId, @@ -1841,26 +2254,49 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple boolean initNew) throws IgniteCheckedException { super(name, - cctx.cacheId(), + grp.groupId(), pageMem, - cctx.shared().wal(), - cctx.offheap().globalRemoveId(), + grp.shared().wal(), + grp.offheap().globalRemoveId(), metaPageId, reuseList, - PendingEntryInnerIO.VERSIONS, - PendingEntryLeafIO.VERSIONS); + grp.sharedGroup() ? CacheIdAwarePendingEntryInnerIO.VERSIONS : PendingEntryInnerIO.VERSIONS, + grp.sharedGroup() ? CacheIdAwarePendingEntryLeafIO.VERSIONS : PendingEntryLeafIO.VERSIONS); - this.cctx = cctx; + this.grp = grp; initTree(initNew); } /** {@inheritDoc} */ - @Override protected int compare(BPlusIO<PendingRow> io, long pageAddr, int idx, PendingRow row) + @Override protected int compare(BPlusIO<PendingRow> iox, long pageAddr, int idx, PendingRow row) throws IgniteCheckedException { - long expireTime = ((PendingRowIO)io).getExpireTime(pageAddr, idx); + PendingRowIO io = (PendingRowIO)iox; - int cmp = Long.compare(expireTime, row.expireTime); + int cmp; + + if (grp.sharedGroup()) { + assert row.cacheId != UNDEFINED_CACHE_ID : "Cache ID is not provided!"; + assert io.getCacheId(pageAddr, idx) != UNDEFINED_CACHE_ID : "Cache ID is not stored!"; + + cmp = Integer.compare(io.getCacheId(pageAddr, idx), row.cacheId); + + if (cmp != 0) + return cmp; + + if(cmp == 0 && row.expireTime == 0 && row.link == 0) { + // A search row with a cach ID only is used as a cache bound. + // The found position will be shifted until the exact cache bound is found; + // See for details: + // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findLowerBound() + // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findUpperBound() + return cmp; + } + } + + long expireTime = io.getExpireTime(pageAddr, idx); + + cmp = Long.compare(expireTime, row.expireTime); if (cmp != 0) return cmp; @@ -1868,15 +2304,17 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (row.link == 0L) return 0; - long link = ((PendingRowIO)io).getLink(pageAddr, idx); + long link = io.getLink(pageAddr, idx); return Long.compare(link, row.link); } /** {@inheritDoc} */ - @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object ignore) + @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object flag) throws IgniteCheckedException { - return io.getLookupRow(this, pageAddr, idx); + PendingRow row = io.getLookupRow(this, pageAddr, idx); + + return flag == WITHOUT_KEY ? row : row.initKey(grp); } } @@ -1897,22 +2335,27 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @return Link. */ long getLink(long pageAddr, int idx); + + /** + * @param pageAddr Page address. + * @param idx Index. + * @return Cache ID or {@code 0} if Cache ID is not defined. + */ + int getCacheId(long pageAddr, int idx); } /** * */ - public static class PendingEntryInnerIO extends BPlusInnerIO<PendingRow> implements PendingRowIO { - /** */ - public static final IOVersions<PendingEntryInnerIO> VERSIONS = new IOVersions<>( - new PendingEntryInnerIO(1) - ); - + private static abstract class AbstractPendingEntryInnerIO extends BPlusInnerIO<PendingRow> implements PendingRowIO { /** + * @param type Page type. * @param ver Page format version. + * @param canGetRow If we can get full row from this page. + * @param itemSize Single item size on page. */ - PendingEntryInnerIO(int ver) { - super(T_PENDING_REF_INNER, ver, true, 8 + 8); + protected AbstractPendingEntryInnerIO(int type, int ver, boolean canGetRow, int itemSize) { + super(type, ver, canGetRow, itemSize); } /** {@inheritDoc} */ @@ -1922,6 +2365,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple PageUtils.putLong(pageAddr, off, row.expireTime); PageUtils.putLong(pageAddr, off + 8, row.link); + + if (storeCacheId()) { + assert row.cacheId != UNDEFINED_CACHE_ID; + + PageUtils.putInt(pageAddr, off + 16, row.cacheId); + } } /** {@inheritDoc} */ @@ -1937,14 +2386,20 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple PageUtils.putLong(dstPageAddr, dstOff, expireTime); PageUtils.putLong(dstPageAddr, dstOff + 8, link); + + if (storeCacheId()) { + int cacheId = ((PendingRowIO)srcIo).getCacheId(srcPageAddr, srcIdx); + + assert cacheId != UNDEFINED_CACHE_ID; + + PageUtils.putInt(dstPageAddr, dstOff + 16, cacheId); + } } /** {@inheritDoc} */ @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) throws IgniteCheckedException { - return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, - getExpireTime(pageAddr, idx), - getLink(pageAddr, idx)); + return new PendingRow(getCacheId(pageAddr, idx), getExpireTime(pageAddr, idx), getLink(pageAddr, idx)); } /** {@inheritDoc} */ @@ -1956,22 +2411,24 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple @Override public long getLink(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + 8); } + + /** + * @return {@code True} if cache ID has to be stored. + */ + protected abstract boolean storeCacheId(); } /** * */ - public static class PendingEntryLeafIO extends BPlusLeafIO<PendingRow> implements PendingRowIO { - /** */ - public static final IOVersions<PendingEntryLeafIO> VERSIONS = new IOVersions<>( - new PendingEntryLeafIO(1) - ); - + private static abstract class AbstractPendingEntryLeafIO extends BPlusLeafIO<PendingRow> implements PendingRowIO { /** + * @param type Page type. * @param ver Page format version. + * @param itemSize Single item size on page. */ - PendingEntryLeafIO(int ver) { - super(T_PENDING_REF_LEAF, ver, 8 + 8); + protected AbstractPendingEntryLeafIO(int type, int ver, int itemSize) { + super(type, ver, itemSize); } /** {@inheritDoc} */ @@ -1981,6 +2438,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple PageUtils.putLong(pageAddr, off, row.expireTime); PageUtils.putLong(pageAddr, off + 8, row.link); + + if (storeCacheId()) { + assert row.cacheId != UNDEFINED_CACHE_ID; + + PageUtils.putInt(pageAddr, off + 16, row.cacheId); + } } /** {@inheritDoc} */ @@ -1996,14 +2459,20 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple PageUtils.putLong(dstPageAddr, dstOff, expireTime); PageUtils.putLong(dstPageAddr, dstOff + 8, link); + + if (storeCacheId()) { + int cacheId = ((PendingRowIO)srcIo).getCacheId(srcPageAddr, srcIdx); + + assert cacheId != UNDEFINED_CACHE_ID; + + PageUtils.putInt(dstPageAddr, dstOff + 16, cacheId); + } } /** {@inheritDoc} */ @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) throws IgniteCheckedException { - return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, - getExpireTime(pageAddr, idx), - getLink(pageAddr, idx)); + return new PendingRow(getCacheId(pageAddr, idx), getExpireTime(pageAddr, idx), getLink(pageAddr, idx)); } /** {@inheritDoc} */ @@ -2015,5 +2484,118 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple @Override public long getLink(long pageAddr, int idx) { return PageUtils.getLong(pageAddr, offset(idx) + 8); } + + /** + * @return {@code True} if cache ID has to be stored. + */ + protected abstract boolean storeCacheId(); + } + + /** + * + */ + public static final class PendingEntryInnerIO extends AbstractPendingEntryInnerIO { + /** */ + public static final IOVersions<PendingEntryInnerIO> VERSIONS = new IOVersions<>( + new PendingEntryInnerIO(1) + ); + + /** + * @param ver Page format version. + */ + PendingEntryInnerIO(int ver) { + super(T_PENDING_REF_INNER, ver, true, 16); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return UNDEFINED_CACHE_ID; + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return false; + } + } + + /** + * + */ + public static final class PendingEntryLeafIO extends AbstractPendingEntryLeafIO { + /** */ + public static final IOVersions<PendingEntryLeafIO> VERSIONS = new IOVersions<>( + new PendingEntryLeafIO(1) + ); + + /** + * @param ver Page format version. + */ + PendingEntryLeafIO(int ver) { + super(T_PENDING_REF_LEAF, ver, 16); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return UNDEFINED_CACHE_ID; + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return false; + } + } + + /** + * + */ + public static final class CacheIdAwarePendingEntryInnerIO extends AbstractPendingEntryInnerIO { + /** */ + public static final IOVersions<CacheIdAwarePendingEntryInnerIO> VERSIONS = new IOVersions<>( + new CacheIdAwarePendingEntryInnerIO(1) + ); + + /** + * @param ver Page format version. + */ + CacheIdAwarePendingEntryInnerIO(int ver) { + super(T_CACHE_ID_AWARE_PENDING_REF_INNER, ver, true, 20); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 16); + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return true; + } + } + + /** + * + */ + public static final class CacheIdAwarePendingEntryLeafIO extends AbstractPendingEntryLeafIO { + /** */ + public static final IOVersions<CacheIdAwarePendingEntryLeafIO> VERSIONS = new IOVersions<>( + new CacheIdAwarePendingEntryLeafIO(1) + ); + + /** + * @param ver Page format version. + */ + CacheIdAwarePendingEntryLeafIO(int ver) { + super(T_CACHE_ID_AWARE_PENDING_REF_LEAF, ver, 20); + } + + /** {@inheritDoc} */ + @Override public int getCacheId(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 16); + } + + /** {@inheritDoc} */ + @Override protected boolean storeCacheId() { + return true; + } } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 41b3281..1cdeb3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -63,12 +63,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { /** {@inheritDoc} */ @Override public int partitions() { - CacheConfiguration ccfg = cctx.config(); - - if (ccfg == null) - throw new IgniteException(FAILED_TO_FIND_CACHE_ERR_MSG + cctx.name()); - - return ccfg.getAffinity().partitions(); + return cctx.group().affinityFunction().partitions(); } /** {@inheritDoc} */ @@ -196,7 +191,7 @@ public class GridCacheAffinityImpl<K, V> implements Affinity<K> { int nodesCnt; if (!cctx.isLocal()) - nodesCnt = cctx.discovery().cacheAffinityNodes(cctx.cacheId(), topVer).size(); + nodesCnt = cctx.discovery().cacheGroupAffinityNodes(cctx.groupId(), topVer).size(); else nodesCnt = 1; http://git-wip-us.apache.org/repos/asf/ignite/blob/db85d166/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java index e0076d5..cc26b21 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java @@ -36,11 +36,6 @@ public interface CacheDataRow extends CacheSearchRow { public GridCacheVersion version(); /** - * @return Cache id. Stored only if memory policy with configured per-page eviction is used. - */ - public int cacheId(); - - /** * @return Expire time. */ public long expireTime();