http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java index 8ec5f8f..418d28b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitExistingPageRecord.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; -import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -50,10 +50,10 @@ public class SplitExistingPageRecord extends PageDeltaRecord { } /** {@inheritDoc} */ - @Override public void applyDelta(ByteBuffer buf) throws IgniteCheckedException { - BPlusIO<?> io = PageIO.getBPlusIO(buf); + @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { + BPlusIO<?> io = PageIO.getBPlusIO(pageAddr); - io.splitExistingPage(buf, mid, fwdId); + io.splitExistingPage(pageAddr, mid, fwdId); } /** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java index b4487fa..39f2669 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/SplitForwardPageRecord.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.pagemem.PageMemory; /** * Split forward page record. @@ -79,7 +80,7 @@ public class SplitForwardPageRecord extends PageDeltaRecord { } /** {@inheritDoc} */ - @Override public void applyDelta(ByteBuffer fwdBuf) throws IgniteCheckedException { + @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { throw new IgniteCheckedException("Split forward page record should not be logged."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java index 9d00d77..7cd0948 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/TrackingPageDeltaRecord.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; -import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.processors.cache.database.tree.io.TrackingPageIO; /** @@ -70,8 +70,12 @@ public class TrackingPageDeltaRecord extends PageDeltaRecord { } /** {@inheritDoc} */ - @Override public void applyDelta(ByteBuffer buf) throws IgniteCheckedException { - TrackingPageIO.VERSIONS.forPage(buf).markChanged(buf, pageIdToMark, nextSnapshotId, lastSuccessfulSnapshotId, buf.capacity()); + @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { + TrackingPageIO.VERSIONS.forPage(pageAddr).markChanged(pageMem.pageBuffer(pageAddr), + pageIdToMark, + nextSnapshotId, + lastSuccessfulSnapshotId, + pageMem.pageSize()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java index 92b72ce..c226ba2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java @@ -68,6 +68,13 @@ public interface CacheObject extends Message { public boolean putValue(ByteBuffer buf) throws IgniteCheckedException; /** + * @param addr Address tp write value to. + * @return Number of bytes written. + * @throws IgniteCheckedException If failed. + */ + public int putValue(long addr) throws IgniteCheckedException; + + /** * @param buf Buffer to write value to. * @param off Offset in source binary data. * @param len Length of the data to write. http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index 1f13c6f..688b92f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -78,6 +79,35 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable } /** {@inheritDoc} */ + @Override public int putValue(long addr) throws IgniteCheckedException { + assert valBytes != null : "Value bytes must be initialized before object is stored"; + + return putValue(addr, cacheObjectType(), valBytes, 0); + } + + /** + * @param addr Write address. + * @param type Object type. + * @param valBytes Value bytes array. + * @param valOff Value bytes array offset. + * @return + */ + public static int putValue(long addr, byte type, byte[] valBytes, int valOff) { + int off = 0; + + PageUtils.putInt(addr, off, valBytes.length); + off += 4; + + PageUtils.putByte(addr, off, type); + off++; + + PageUtils.putBytes(addr, off, valBytes, valOff); + off += valBytes.length - valOff; + + return off; + } + + /** {@inheritDoc} */ @Override public boolean putValue(final ByteBuffer buf, int off, int len) throws IgniteCheckedException { assert valBytes != null : "Value bytes must be initialized before object is stored"; @@ -167,8 +197,14 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable * @return {@code True} if data were successfully written. * @throws IgniteCheckedException If failed. */ - public static boolean putValue(byte cacheObjType, final ByteBuffer buf, int off, int len, - byte[] valBytes, final int start) throws IgniteCheckedException { + public static boolean putValue(byte cacheObjType, + final ByteBuffer buf, + int off, + int len, + byte[] valBytes, + final int start) + throws IgniteCheckedException + { int dataLen = valBytes.length; if (buf.remaining() < len) http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java index b3a4117..eee6fcc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java @@ -82,6 +82,11 @@ public class CacheObjectByteArrayImpl implements CacheObject, Externalizable { } /** {@inheritDoc} */ + @Override public int putValue(long addr) throws IgniteCheckedException { + return CacheObjectAdapter.putValue(addr, cacheObjectType(), val, 0); + } + + /** {@inheritDoc} */ @Override public boolean putValue(final ByteBuffer buf, int off, int len) throws IgniteCheckedException { assert val != null : "Value is not initialized"; http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 76450fb..f7e46d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; @@ -31,11 +30,14 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.FullPageId; +import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter; +import org.apache.ignite.internal.processors.cache.database.CacheSearchRow; import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.database.RootPage; import org.apache.ignite.internal.processors.cache.database.RowStore; @@ -44,6 +46,8 @@ import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; @@ -69,6 +73,8 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId; +import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId; /** * @@ -880,7 +886,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple CacheObject val, GridCacheVersion ver, long expireTime) throws IgniteCheckedException { - DataRow dataRow = new DataRow(key.hashCode(), key, val, ver, p, expireTime); + DataRow dataRow = new DataRow(key, val, ver, p, expireTime); // Make sure value bytes initialized. key.valueBytes(cctx.cacheObjectContext()); @@ -894,7 +900,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple assert dataRow.link() != 0 : dataRow; - DataRow old = dataTree.put(dataRow); + CacheDataRow old = dataTree.put(dataRow); if (old == null) storageSize.incrementAndGet(); @@ -933,7 +939,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple throw new NodeStoppingException("Operation has been cancelled (node is stopping)."); try { - DataRow dataRow = dataTree.remove(new KeySearchRow(key.hashCode(), key, 0)); + CacheDataRow dataRow = dataTree.remove(new SearchRow(key)); CacheObject val = null; GridCacheVersion ver = null; @@ -970,7 +976,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException { - return dataTree.findOne(new KeySearchRow(key.hashCode(), key, 0)); + return dataTree.findOne(new SearchRow(key)); } /** {@inheritDoc} */ @@ -981,14 +987,14 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** {@inheritDoc} */ @Override public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower, KeyCacheObject upper) throws IgniteCheckedException { - KeySearchRow lowerRow = null; - KeySearchRow upperRow = null; + SearchRow lowerRow = null; + SearchRow upperRow = null; if (lower != null) - lowerRow = new KeySearchRow(lower.hashCode(), lower, 0); + lowerRow = new SearchRow(lower); if (upper != null) - upperRow = new KeySearchRow(upper.hashCode(), upper, 0); + upperRow = new SearchRow(upper); return dataTree.find(lowerRow, upperRow); } @@ -1034,82 +1040,81 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** * */ - private class KeySearchRow extends CacheDataRowAdapter { + private static class SearchRow implements CacheSearchRow { /** */ - protected int hash; + private final KeyCacheObject key; + + /** */ + private final int hash; /** - * @param hash Hash code. * @param key Key. - * @param link Link. */ - KeySearchRow(int hash, KeyCacheObject key, long link) { - super(link); - + SearchRow(KeyCacheObject key) { this.key = key; - this.hash = hash; - } - /** - * Init data. - * - * @param keyOnly Initialize only key. - */ - protected final void initData(boolean keyOnly) { - if (key != null) - return; - - assert link() != 0; - - try { - initFromLink(cctx, keyOnly); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e.getMessage(), e); - } + hash = key.hashCode(); } - /** - * @return Key. - */ + /** {@inheritDoc} */ @Override public KeyCacheObject key() { - initData(true); - return key; } + + /** {@inheritDoc} */ + @Override public long link() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public int hash() { + return hash; + } } /** * */ - private class DataRow extends KeySearchRow { + private class DataRow extends CacheDataRowAdapter { /** */ protected int part = -1; + /** */ + protected int hash; + /** * @param hash Hash code. * @param link Link. + * @param keyOnly If {@code true} initializes only key. */ - DataRow(int hash, long link) { - super(hash, null, link); + DataRow(int hash, long link, boolean keyOnly) { + super(link); + + this.hash = hash; part = PageIdUtils.partId(link); - // We can not init data row lazily because underlying buffer can be concurrently cleared. - initData(false); + try { + // We can not init data row lazily because underlying buffer can be concurrently cleared. + initFromLink(cctx, keyOnly); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } /** - * @param hash Hash code. * @param key Key. * @param val Value. * @param ver Version. * @param part Partition. * @param expireTime Expire time. */ - DataRow(int hash, KeyCacheObject key, CacheObject val, GridCacheVersion ver, int part, long expireTime) { - super(hash, key, 0); + DataRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, int part, long expireTime) { + super(0); + this.hash = key.hashCode(); + this.key = key; this.val = val; this.ver = ver; this.part = part; @@ -1122,6 +1127,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ + @Override public int hash() { + return hash; + } + + /** {@inheritDoc} */ @Override public void link(long link) { this.link = link; } @@ -1130,7 +1140,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple /** * */ - protected static class CacheDataTree extends BPlusTree<KeySearchRow, DataRow> { + protected static class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { /** */ private final CacheDataRowStore rowStore; @@ -1169,38 +1179,76 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override protected int compare(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx, KeySearchRow row) + @Override protected int compare(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, CacheSearchRow row) throws IgniteCheckedException { - int hash = ((RowLinkIO)io).getHash(buf, idx); + int hash = ((RowLinkIO)io).getHash(pageAddr, idx); - int cmp = Integer.compare(hash, row.hash); + int cmp = Integer.compare(hash, row.hash()); if (cmp != 0) return cmp; - KeySearchRow row0 = io.getLookupRow(this, buf, idx); + long link = ((RowLinkIO)io).getLink(pageAddr, idx); - return compareKeys(row0.key(), row.key()); - } + assert row.key() != null : row; - /** {@inheritDoc} */ - @Override protected DataRow getRow(BPlusIO<KeySearchRow> io, ByteBuffer buf, int idx) - throws IgniteCheckedException { - int hash = ((RowLinkIO)io).getHash(buf, idx); - long link = ((RowLinkIO)io).getLink(buf, idx); - - return rowStore.dataRow(hash, link); + return compareKeys(row.key(), link); } /** - * @param key1 First key. - * @param key2 Second key. + * @param key Key. + * @param link Link. * @return Compare result. * @throws IgniteCheckedException If failed. */ - private int compareKeys(CacheObject key1, CacheObject key2) throws IgniteCheckedException { - byte[] bytes1 = key1.valueBytes(cctx.cacheObjectContext()); - byte[] bytes2 = key2.valueBytes(cctx.cacheObjectContext()); + private int compareKeys(KeyCacheObject key, final long link) throws IgniteCheckedException { + byte[] bytes = key.valueBytes(cctx.cacheObjectContext()); + + PageMemory pageMem = cctx.shared().database().pageMemory(); + + try (Page page = page(pageId(link))) { + long pageAddr = page.getForReadPointer(); // Non-empty data page must not be recycled. + + assert pageAddr != 0L : link; + + try { + DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + + DataPagePayload data = io.readPayload(pageAddr, + itemId(link), + pageMem.pageSize()); + + if (data.nextLink() == 0) { + long addr = pageAddr + data.offset(); + + int len = PageUtils.getInt(addr, 0); + + int size = Math.min(bytes.length, len); + + addr += 5; // Skip length and type byte. + + for (int i = 0; i < size; i++) { + byte b1 = PageUtils.getByte(addr, i); + byte b2 = bytes[i]; + + if (b1 != b2) + return b1 > b2 ? 1 : -1; + } + + return Integer.compare(len, bytes.length); + } + } + finally { + page.releaseRead(); + } + } + + // TODO GG-11768. + CacheDataRowAdapter other = new CacheDataRowAdapter(link); + other.initFromLink(cctx, true); + + byte[] bytes1 = other.key().valueBytes(cctx.cacheObjectContext()); + byte[] bytes2 = key.valueBytes(cctx.cacheObjectContext()); int len = Math.min(bytes1.length, bytes2.length); @@ -1214,6 +1262,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple return Integer.compare(bytes1.length, bytes2.length); } + + /** {@inheritDoc} */ + @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx) + throws IgniteCheckedException { + int hash = ((RowLinkIO)io).getHash(pageAddr, idx); + long link = ((RowLinkIO)io).getLink(pageAddr, idx); + + return rowStore.dataRow(hash, link); + } } /** @@ -1233,8 +1290,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @param link Link. * @return Search row. */ - private KeySearchRow keySearchRow(int hash, long link) { - return new KeySearchRow(hash, null, link); + private CacheSearchRow keySearchRow(int hash, long link) { + return new DataRow(hash, link, true); } /** @@ -1242,8 +1299,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple * @param link Link. * @return Data row. */ - private DataRow dataRow(int hash, long link) { - return new DataRow(hash, link); + private CacheDataRow dataRow(int hash, long link) { + return new DataRow(hash, link, false); } } @@ -1259,28 +1316,39 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** + * @param pageAddr Page address. + * @param off Offset. + * @param link Link. + * @param hash Hash. + */ + private static void store0(long pageAddr, int off, long link, int hash) { + PageUtils.putLong(pageAddr, off, link); + PageUtils.putInt(pageAddr, off + 8, hash); + } + + /** * */ private interface RowLinkIO { /** - * @param buf Buffer. + * @param pageAddr Page address. * @param idx Index. * @return Row link. */ - public long getLink(ByteBuffer buf, int idx); + public long getLink(long pageAddr, int idx); /** - * @param buf Buffer. + * @param pageAddr Page address. * @param idx Index. * @return Key hash code. */ - public int getHash(ByteBuffer buf, int idx); + public int getHash(long pageAddr, int idx); } /** * */ - public static final class DataInnerIO extends BPlusInnerIO<KeySearchRow> implements RowLinkIO { + public static final class DataInnerIO extends BPlusInnerIO<CacheSearchRow> implements RowLinkIO { /** */ public static final IOVersions<DataInnerIO> VERSIONS = new IOVersions<>( new DataInnerIO(1) @@ -1294,46 +1362,53 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public void storeByOffset(ByteBuffer buf, int off, KeySearchRow row) { + @Override public void storeByOffset(ByteBuffer buf, int off, CacheSearchRow row) throws IgniteCheckedException { assert row.link() != 0; - store0(buf, off, row.link(), row.hash); + store0(buf, off, row.link(), row.hash()); } /** {@inheritDoc} */ - @Override public KeySearchRow getLookupRow(BPlusTree<KeySearchRow, ?> tree, ByteBuffer buf, int idx) { - int hash = getHash(buf, idx); - long link = getLink(buf, idx); + @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) { + assert row.link() != 0; + + store0(pageAddr, off, row.link(), row.hash()); + } + + /** {@inheritDoc} */ + @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long pageAddr, int idx) { + int hash = getHash(pageAddr, idx); + long link = getLink(pageAddr, idx); return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link); } /** {@inheritDoc} */ - @Override public void store(ByteBuffer dst, int dstIdx, BPlusIO<KeySearchRow> srcIo, ByteBuffer src, + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr, int srcIdx) { - int hash = ((RowLinkIO)srcIo).getHash(src, srcIdx); - long link = ((RowLinkIO)srcIo).getLink(src, srcIdx); + int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx); + long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx); - store0(dst, offset(dstIdx), link, hash); + store0(dstPageAddr, offset(dstIdx), link, hash); } /** {@inheritDoc} */ - @Override public long getLink(ByteBuffer buf, int idx) { - assert idx < getCount(buf) : idx; + @Override public long getLink(long pageAddr, int idx) { + assert idx < getCount(pageAddr) : idx; - return buf.getLong(offset(idx)); + return PageUtils.getLong(pageAddr, offset(idx)); } /** {@inheritDoc} */ - @Override public int getHash(ByteBuffer buf, int idx) { - return buf.getInt(offset(idx) + 8); + @Override public int getHash(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 8); } } /** * */ - public static final class DataLeafIO extends BPlusLeafIO<KeySearchRow> implements RowLinkIO { + public static final class DataLeafIO extends BPlusLeafIO<CacheSearchRow> implements RowLinkIO { /** */ public static final IOVersions<DataLeafIO> VERSIONS = new IOVersions<>( new DataLeafIO(1) @@ -1347,20 +1422,27 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public void storeByOffset(ByteBuffer buf, int off, KeySearchRow row) { + @Override public void storeByOffset(ByteBuffer buf, int off, CacheSearchRow row) throws IgniteCheckedException { + assert row.link() != 0; + + store0(buf, off, row.link(), row.hash()); + } + + /** {@inheritDoc} */ + @Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) { assert row.link() != 0; - store0(buf, off, row.link(), row.hash); + store0(pageAddr, off, row.link(), row.hash()); } /** {@inheritDoc} */ - @Override public void store(ByteBuffer dst, int dstIdx, BPlusIO<KeySearchRow> srcIo, ByteBuffer src, + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr, int srcIdx) { - store0(dst, offset(dstIdx), getLink(src, srcIdx), getHash(src, srcIdx)); + store0(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx), getHash(srcPageAddr, srcIdx)); } /** {@inheritDoc} */ - @Override public KeySearchRow getLookupRow(BPlusTree<KeySearchRow, ?> tree, ByteBuffer buf, int idx) { + @Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long buf, int idx) { int hash = getHash(buf, idx); long link = getLink(buf, idx); @@ -1368,15 +1450,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public long getLink(ByteBuffer buf, int idx) { - assert idx < getCount(buf) : idx; + @Override public long getLink(long pageAddr, int idx) { + assert idx < getCount(pageAddr) : idx; - return buf.getLong(offset(idx)); + return PageUtils.getLong(pageAddr, offset(idx)); } /** {@inheritDoc} */ - @Override public int getHash(ByteBuffer buf, int idx) { - return buf.getInt(offset(idx) + 8); + @Override public int getHash(long pageAddr, int idx) { + return PageUtils.getInt(pageAddr, offset(idx) + 8); } } @@ -1471,9 +1553,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override protected int compare(BPlusIO<PendingRow> io, ByteBuffer buf, int idx, PendingRow row) + @Override protected int compare(BPlusIO<PendingRow> io, long pageAddr, int idx, PendingRow row) throws IgniteCheckedException { - long expireTime = ((PendingRowIO)io).getExpireTime(buf, idx); + long expireTime = ((PendingRowIO)io).getExpireTime(pageAddr, idx); int cmp = Long.compare(expireTime, row.expireTime); @@ -1483,15 +1565,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple if (row.link == 0L) return 0; - long link = ((PendingRowIO)io).getLink(buf, idx); + long link = ((PendingRowIO)io).getLink(pageAddr, idx); return Long.compare(link, row.link); } /** {@inheritDoc} */ - @Override protected PendingRow getRow(BPlusIO<PendingRow> io, ByteBuffer buf, int idx) + @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx) throws IgniteCheckedException { - return io.getLookupRow(this, buf, idx); + return io.getLookupRow(this, pageAddr, idx); } } @@ -1500,18 +1582,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple */ private interface PendingRowIO { /** - * @param buf Buffer. + * @param pageAddr Page address. * @param idx Index. * @return Expire time. */ - long getExpireTime(ByteBuffer buf, int idx); + long getExpireTime(long pageAddr, int idx); /** - * @param buf Buffer. + * @param pageAddr Page address. * @param idx Index. * @return Link. */ - long getLink(ByteBuffer buf, int idx); + long getLink(long pageAddr, int idx); } /** @@ -1540,34 +1622,45 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public void store(ByteBuffer dst, + @Override public void storeByOffset(long pageAddr, int off, PendingRow row) throws IgniteCheckedException { + assert row.link != 0; + assert row.expireTime != 0; + + PageUtils.putLong(pageAddr, off, row.expireTime); + PageUtils.putLong(pageAddr, off + 8, row.link); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<PendingRow> srcIo, - ByteBuffer src, + long srcPageAddr, int srcIdx) throws IgniteCheckedException { int dstOff = offset(dstIdx); - long link = ((PendingRowIO)srcIo).getLink(src, srcIdx); - long expireTime = ((PendingRowIO)srcIo).getExpireTime(src, srcIdx); + long link = ((PendingRowIO)srcIo).getLink(srcPageAddr, srcIdx); + long expireTime = ((PendingRowIO)srcIo).getExpireTime(srcPageAddr, srcIdx); - dst.putLong(dstOff, expireTime); - dst.putLong(dstOff + 8, link); + PageUtils.putLong(dstPageAddr, dstOff, expireTime); + PageUtils.putLong(dstPageAddr, dstOff + 8, link); } /** {@inheritDoc} */ - @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, ByteBuffer buf, int idx) + @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) throws IgniteCheckedException { - return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, getExpireTime(buf, idx), getLink(buf, idx)); + return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, + getExpireTime(pageAddr, idx), + getLink(pageAddr, idx)); } /** {@inheritDoc} */ - @Override public long getExpireTime(ByteBuffer buf, int idx) { - return buf.getLong(offset(idx)); + @Override public long getExpireTime(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx)); } /** {@inheritDoc} */ - @Override public long getLink(ByteBuffer buf, int idx) { - return buf.getLong(offset(idx) + 8); + @Override public long getLink(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + 8); } } @@ -1597,34 +1690,45 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ - @Override public void store(ByteBuffer dst, + @Override public void storeByOffset(long pageAddr, int off, PendingRow row) throws IgniteCheckedException { + assert row.link != 0; + assert row.expireTime != 0; + + PageUtils.putLong(pageAddr, off, row.expireTime); + PageUtils.putLong(pageAddr, off + 8, row.link); + } + + /** {@inheritDoc} */ + @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<PendingRow> srcIo, - ByteBuffer src, + long srcPageAddr, int srcIdx) throws IgniteCheckedException { int dstOff = offset(dstIdx); - long link = ((PendingRowIO)srcIo).getLink(src, srcIdx); - long expireTime = ((PendingRowIO)srcIo).getExpireTime(src, srcIdx); + long link = ((PendingRowIO)srcIo).getLink(srcPageAddr, srcIdx); + long expireTime = ((PendingRowIO)srcIo).getExpireTime(srcPageAddr, srcIdx); - dst.putLong(dstOff, expireTime); - dst.putLong(dstOff + 8, link); + PageUtils.putLong(dstPageAddr, dstOff, expireTime); + PageUtils.putLong(dstPageAddr, dstOff + 8, link); } /** {@inheritDoc} */ - @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, ByteBuffer buf, int idx) + @Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx) throws IgniteCheckedException { - return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, getExpireTime(buf, idx), getLink(buf, idx)); + return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx, + getExpireTime(pageAddr, idx), + getLink(pageAddr, idx)); } /** {@inheritDoc} */ - @Override public long getExpireTime(ByteBuffer buf, int idx) { - return buf.getLong(offset(idx)); + @Override public long getExpireTime(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx)); } /** {@inheritDoc} */ - @Override public long getLink(ByteBuffer buf, int idx) { - return buf.getLong(offset(idx) + 8); + @Override public long getLink(long pageAddr, int idx) { + return PageUtils.getLong(pageAddr, offset(idx) + 8); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java index d4d7020..75ab8e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java @@ -18,18 +18,12 @@ package org.apache.ignite.internal.processors.cache.database; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; /** * Cache data row. */ -public interface CacheDataRow { - /** - * @return Cache key. - */ - public KeyCacheObject key(); - +public interface CacheDataRow extends CacheSearchRow { /** * @return Cache value. */ @@ -51,11 +45,6 @@ public interface CacheDataRow { public int partition(); /** - * @return Link for this row. - */ - public long link(); - - /** * @param link Link for this row. */ public void link(long link); http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index b5babc4..5288aad 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -21,14 +21,17 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IncompleteCacheObject; import org.apache.ignite.internal.processors.cache.IncompleteObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -88,20 +91,26 @@ public class CacheDataRowAdapter implements CacheDataRow { boolean first = true; do { + PageMemory pageMem = cctx.shared().database().pageMemory(); + try (Page page = page(pageId(nextLink), cctx)) { - ByteBuffer buf = page.getForRead(); // Non-empty data page must not be recycled. + long pageAddr = page.getForReadPointer(); // Non-empty data page must not be recycled. - assert buf != null: nextLink; + assert pageAddr != 0L : nextLink; try { - DataPageIO io = DataPageIO.VERSIONS.forPage(buf); + DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); - nextLink = io.setPositionAndLimitOnPayload(buf, itemId(nextLink)); + DataPagePayload data = io.readPayload(pageAddr, + itemId(nextLink), + pageMem.pageSize()); + + nextLink = data.nextLink(); if (first) { if (nextLink == 0) { // Fast path for a single page row. - readFullRow(coctx, buf, keyOnly); + readFullRow(coctx, pageAddr + data.offset(), keyOnly); return; } @@ -109,6 +118,11 @@ public class CacheDataRowAdapter implements CacheDataRow { first = false; } + ByteBuffer buf = pageMem.pageBuffer(pageAddr); + + buf.position(data.offset()); + buf.limit(data.offset() + data.payloadSize()); + incomplete = readFragment(coctx, buf, keyOnly, incomplete); if (keyOnly && key != null) @@ -121,7 +135,7 @@ public class CacheDataRowAdapter implements CacheDataRow { } while(nextLink != 0); - assert isReady(): "ready"; + assert isReady() : "ready"; } /** @@ -130,6 +144,7 @@ public class CacheDataRowAdapter implements CacheDataRow { * @param keyOnly {@code true} If need to read only key object. * @param incomplete Incomplete object. * @throws IgniteCheckedException If failed. + * @return Read object. */ private IncompleteObject<?> readFragment( CacheObjectContext coctx, @@ -175,12 +190,23 @@ public class CacheDataRowAdapter implements CacheDataRow { /** * @param coctx Cache object context. - * @param buf Buffer. + * @param addr Address. * @param keyOnly {@code true} If need to read only key object. * @throws IgniteCheckedException If failed. */ - private void readFullRow(CacheObjectContext coctx, ByteBuffer buf, boolean keyOnly) throws IgniteCheckedException { - key = coctx.processor().toKeyCacheObject(coctx, buf); + private void readFullRow(CacheObjectContext coctx, long addr, boolean keyOnly) throws IgniteCheckedException { + int off = 0; + + int len = PageUtils.getInt(addr, off); + off += 4; + + byte type = PageUtils.getByte(addr, off); + off++; + + byte[] bytes = PageUtils.getBytes(addr, off, len); + off += len; + + key = coctx.processor().toKeyCacheObject(coctx, type, bytes); if (keyOnly) { assert key != null: "key"; @@ -188,9 +214,22 @@ public class CacheDataRowAdapter implements CacheDataRow { return; } - val = coctx.processor().toCacheObject(coctx, buf); - ver = CacheVersionIO.read(buf, false); - expireTime = buf.getLong(); + len = PageUtils.getInt(addr, off); + off += 4; + + type = PageUtils.getByte(addr, off); + off++; + + bytes = PageUtils.getBytes(addr, off, len); + off += len; + + val = coctx.processor().toCacheObject(coctx, type, bytes); + + ver = CacheVersionIO.read(addr + off, false); + + off += CacheVersionIO.size(ver, false); + + expireTime = PageUtils.getLong(addr, off); assert isReady(): "ready"; } @@ -249,6 +288,7 @@ public class CacheDataRowAdapter implements CacheDataRow { * @param buf Buffer. * @param incomplete Incomplete object. * @return Incomplete object. + * @throws IgniteCheckedException If failed. */ private IncompleteObject<?> readIncompleteExpireTime( ByteBuffer buf, @@ -292,6 +332,7 @@ public class CacheDataRowAdapter implements CacheDataRow { * @param buf Buffer. * @param incomplete Incomplete object. * @return Incomplete object. + * @throws IgniteCheckedException If failed. */ private IncompleteObject<?> readIncompleteVersion( ByteBuffer buf, @@ -385,6 +426,11 @@ public class CacheDataRowAdapter implements CacheDataRow { } /** {@inheritDoc} */ + @Override public int hash() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheDataRowAdapter.class, this, "link", U.hexLong(link)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java new file mode 100644 index 0000000..d51cf0e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.database; + +import org.apache.ignite.internal.processors.cache.KeyCacheObject; + +/** + * + */ +public interface CacheSearchRow { + /** + * @return Cache key. + */ + public KeyCacheObject key(); + + /** + * @return Link for this row. + */ + public long link(); + + /** + * @return Key hash code. + */ + public int hash(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java index 5fd64b0..f47a697 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache.database; -import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.IgniteCheckedException; @@ -131,35 +130,35 @@ public abstract class DataStructure implements PageLockListener { /** * @param page Page. - * @return Buffer. + * @return Page address. */ - protected final ByteBuffer tryWriteLock(Page page) { + protected final long tryWriteLock(Page page) { return PageHandler.writeLock(page, this, true); } /** * @param page Page. - * @return Buffer. + * @return Page address. */ - protected final ByteBuffer writeLock(Page page) { + protected final long writeLock(Page page) { return PageHandler.writeLock(page, this, false); } /** * @param page Page. - * @param buf Buffer. + * @param pageAddr Page address. * @param dirty Dirty page. */ - protected final void writeUnlock(Page page, ByteBuffer buf, boolean dirty) { - PageHandler.writeUnlock(page, buf, this, dirty); + protected final void writeUnlock(Page page, long pageAddr, boolean dirty) { + PageHandler.writeUnlock(page, pageAddr, this, dirty); } /** * @param page Page. - * @return Buffer. + * @return Page address. */ - protected final ByteBuffer readLock(Page page) { + protected final long readLock(Page page) { return PageHandler.readLock(page, this); } @@ -167,22 +166,29 @@ public abstract class DataStructure implements PageLockListener { * @param page Page. * @param buf Buffer. */ - protected final void readUnlock(Page page, ByteBuffer buf) { + protected final void readUnlock(Page page, long buf) { PageHandler.readUnlock(page, buf, this); } + /** + * @return Page size. + */ + protected final int pageSize() { + return pageMem.pageSize(); + } + /** {@inheritDoc} */ @Override public void onBeforeWriteLock(Page page) { // No-op. } /** {@inheritDoc} */ - @Override public void onWriteLock(Page page, ByteBuffer buf) { + @Override public void onWriteLock(Page page, long pageAddr) { // No-op. } /** {@inheritDoc} */ - @Override public void onWriteUnlock(Page page, ByteBuffer buf) { + @Override public void onWriteUnlock(Page page, long pageAddr) { // No-op. } @@ -192,12 +198,12 @@ public abstract class DataStructure implements PageLockListener { } /** {@inheritDoc} */ - @Override public void onReadLock(Page page, ByteBuffer buf) { + @Override public void onReadLock(Page page, long pageAddr) { // No-op. } /** {@inheritDoc} */ - @Override public void onReadUnlock(Page page, ByteBuffer buf) { + @Override public void onReadUnlock(Page page, long pageAddr) { // No-op. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 18b3a1f..9c10057 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -256,7 +256,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap true, sizes); - return new PageMemoryNoStoreImpl(log, memProvider, cctx, dbCfg.getPageSize()); + return new PageMemoryNoStoreImpl(log, memProvider, cctx, dbCfg.getPageSize(), false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java index 26151ac..cf6decb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO; @@ -31,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO; import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -190,19 +192,19 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override protected int compare(final BPlusIO<IndexItem> io, final ByteBuffer buf, final int idx, + @Override protected int compare(final BPlusIO<IndexItem> io, final long pageAddr, final int idx, final IndexItem row) throws IgniteCheckedException { final int off = ((IndexIO)io).getOffset(idx); int shift = 0; // Compare index names. - final byte len = buf.get(off + shift); + final byte len = PageUtils.getByte(pageAddr, off + shift); shift += BYTE_LEN; for (int i = 0; i < len && i < row.idxName.length; i++) { - final int cmp = Byte.compare(buf.get(off + i + shift), row.idxName[i]); + final int cmp = Byte.compare(PageUtils.getByte(pageAddr, off + i + shift), row.idxName[i]); if (cmp != 0) return cmp; @@ -212,9 +214,9 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final ByteBuffer buf, + @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr, final int idx) throws IgniteCheckedException { - return readRow(buf, ((IndexIO)io).getOffset(idx)); + return readRow(pageAddr, ((IndexIO)io).getOffset(idx)); } } @@ -275,78 +277,78 @@ public class MetadataStorage implements MetaStore { } /** - * Copy row data. + * Store row to buffer. * - * @param dst Destination buffer. - * @param dstOff Destination buf offset. - * @param src Source buffer. - * @param srcOff Src buf offset. + * @param pageAddr Page address. + * @param off Offset in buf. + * @param row Row to store. */ private static void storeRow( - final ByteBuffer dst, - final int dstOff, - final ByteBuffer src, - final int srcOff + final long pageAddr, + int off, + final IndexItem row ) { - int srcOrigPos = src.position(); - int dstOrigPos = dst.position(); - - try { - src.position(srcOff); - dst.position(dstOff); - - // Index name length. - final byte len = src.get(); + // Index name length. + PageUtils.putByte(pageAddr, off, (byte)row.idxName.length); + off++; - dst.put(len); + // Index name. + PageUtils.putBytes(pageAddr, off, row.idxName); + off += row.idxName.length; - int lim = src.limit(); + // Page ID. + PageUtils.putLong(pageAddr, off, row.pageId); + } - src.limit(src.position() + len); + /** + * Copy row data. + * + * @param dstPageAddr Destination page address. + * @param dstOff Destination buf offset. + * @param srcPageAddr Source page address. + * @param srcOff Src buf offset. + */ + private static void storeRow( + final long dstPageAddr, + int dstOff, + final long srcPageAddr, + int srcOff + ) { + // Index name length. + final byte len = PageUtils.getByte(srcPageAddr, srcOff); + srcOff++; - // Index name. - dst.put(src); + PageUtils.putByte(dstPageAddr, dstOff, len); + dstOff++; - src.limit(lim); + PageHandler.copyMemory(srcPageAddr, srcOff, dstPageAddr, dstOff, len); + srcOff += len; + dstOff += len; - // Page ID. - dst.putLong(src.getLong()); - } - finally { - src.position(srcOrigPos); - dst.position(dstOrigPos); - } + // Page ID. + PageUtils.putLong(dstPageAddr, dstOff, PageUtils.getLong(srcPageAddr, srcOff)); } /** * Read row from buffer. * - * @param buf Buffer to read. - * @param off Offset in buf. + * @param pageAddr Page address. + * @param off Offset. * @return Read row. */ - private static IndexItem readRow(final ByteBuffer buf, final int off) { - int origOff = buf.position(); - - try { - buf.position(off); - - // Index name length. - final int len = buf.get() & 0xFF; - - // Index name. - final byte[] idxName = new byte[len]; + private static IndexItem readRow(final long pageAddr, int off) { + // Index name length. + final int len = PageUtils.getByte(pageAddr, off) & 0xFF; + off++; - buf.get(idxName); + // Index name. + final byte[] idxName = PageUtils.getBytes(pageAddr, off, len); + off += len; - // Page ID. - final long pageId = buf.getLong(); + // Page ID. + final long pageId = PageUtils.getLong(pageAddr, off); - return new IndexItem(idxName, pageId); - } - finally { - buf.position(origOff); - } + return new IndexItem(idxName, pageId); } /** @@ -383,16 +385,21 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override public void store(final ByteBuffer dst, final int dstIdx, final BPlusIO<IndexItem> srcIo, - final ByteBuffer src, + @Override public void storeByOffset(long pageAddr, int off, IndexItem row) throws IgniteCheckedException { + storeRow(pageAddr, off, row); + } + + /** {@inheritDoc} */ + @Override public void store(final long dstPageAddr, final int dstIdx, final BPlusIO<IndexItem> srcIo, + final long srcPageAddr, final int srcIdx) throws IgniteCheckedException { - storeRow(dst, offset(dstIdx), src, ((IndexIO)srcIo).getOffset(srcIdx)); + storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcIdx)); } /** {@inheritDoc} */ - @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final ByteBuffer buf, + @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final long pageAddr, final int idx) throws IgniteCheckedException { - return readRow(buf, offset(idx)); + return readRow(pageAddr, offset(idx)); } /** {@inheritDoc} */ @@ -424,16 +431,24 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override public void store(final ByteBuffer dst, final int dstIdx, final BPlusIO<IndexItem> srcIo, - final ByteBuffer src, + @Override public void storeByOffset(long buf, int off, IndexItem row) throws IgniteCheckedException { + storeRow(buf, off, row); + } + + /** {@inheritDoc} */ + @Override public void store(final long dstPageAddr, + final int dstIdx, + final BPlusIO<IndexItem> srcIo, + final long srcPageAddr, final int srcIdx) throws IgniteCheckedException { - storeRow(dst, offset(dstIdx), src, ((IndexIO)srcIo).getOffset(srcIdx)); + storeRow(dstPageAddr, offset(dstIdx), srcPageAddr, ((IndexIO)srcIo).getOffset(srcIdx)); } /** {@inheritDoc} */ - @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final ByteBuffer buf, + @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, + final long pageAddr, final int idx) throws IgniteCheckedException { - return readRow(buf, offset(idx)); + return readRow(pageAddr, offset(idx)); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/7db65ddd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java index 6a29027..6c1b21b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.cache.database.freelist; -import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; @@ -31,6 +31,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseBag; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; @@ -72,26 +73,26 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { /** */ private final PageHandler<CacheDataRow, Integer> writeRow = new PageHandler<CacheDataRow, Integer>() { - @Override public Integer run(Page page, PageIO iox, ByteBuffer buf, CacheDataRow row, int written) + @Override public Integer run(Page page, PageIO iox, long pageAddr, CacheDataRow row, int written) throws IgniteCheckedException { DataPageIO io = (DataPageIO)iox; int rowSize = getRowSize(row); - int oldFreeSpace = io.getFreeSpace(buf); + int oldFreeSpace = io.getFreeSpace(pageAddr); assert oldFreeSpace > 0 : oldFreeSpace; // If the full row does not fit into this page write only a fragment. - written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(page, buf, io, row, rowSize): - addRowFragment(page, buf, io, row, written, rowSize); + written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(page, pageAddr, io, row, rowSize): + addRowFragment(page, pageAddr, io, row, written, rowSize); // Reread free space after update. - int newFreeSpace = io.getFreeSpace(buf); + int newFreeSpace = io.getFreeSpace(pageAddr); if (newFreeSpace > MIN_PAGE_FREE_SPACE) { int bucket = bucket(newFreeSpace, false); - put(null, page, buf, bucket); + put(null, page, pageAddr, bucket); } // Avoid boxing with garbage generation for usual case. @@ -109,24 +110,22 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private int addRow( Page page, - ByteBuffer buf, + long buf, DataPageIO io, CacheDataRow row, int rowSize ) throws IgniteCheckedException { - // TODO: context parameter. - io.addRow(buf, row, rowSize); + io.addRow(buf, row, rowSize, pageSize()); if (isWalDeltaRecordNeeded(wal, page)) { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[rowSize]; - io.setPositionAndLimitOnPayload(buf, PageIdUtils.itemId(row.link())); + DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); - assert buf.remaining() == rowSize; + assert data.payloadSize() == rowSize; - buf.get(payload); - buf.position(0); + PageUtils.getBytes(buf, data.offset(), payload, 0, rowSize); wal.log(new DataPageInsertRecord( cacheId, @@ -149,7 +148,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { */ private int addRowFragment( Page page, - ByteBuffer buf, + long buf, DataPageIO io, CacheDataRow row, int written, @@ -158,17 +157,17 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // Read last link before the fragment write, because it will be updated there. long lastLink = row.link(); - int payloadSize = io.addRowFragment(buf, row, written, rowSize); + int payloadSize = io.addRowFragment(pageMem, buf, row, written, rowSize, pageSize()); - assert payloadSize > 0: payloadSize; + assert payloadSize > 0 : payloadSize; if (isWalDeltaRecordNeeded(wal, page)) { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[payloadSize]; - io.setPositionAndLimitOnPayload(buf, PageIdUtils.itemId(row.link())); - buf.get(payload); - buf.position(0); + DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); + + PageUtils.getBytes(buf, data.offset(), payload, 0, payloadSize); wal.log(new DataPageInsertFragmentRecord(cacheId, page.id(), payload, lastLink)); } @@ -179,15 +178,15 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { /** */ private final PageHandler<Void, Long> rmvRow = new PageHandler<Void, Long>() { - @Override public Long run(Page page, PageIO iox, ByteBuffer buf, Void arg, int itemId) + @Override public Long run(Page page, PageIO iox, long pageAddr, Void arg, int itemId) throws IgniteCheckedException { DataPageIO io = (DataPageIO)iox; - int oldFreeSpace = io.getFreeSpace(buf); + int oldFreeSpace = io.getFreeSpace(pageAddr); assert oldFreeSpace >= 0: oldFreeSpace; - long nextLink = io.removeRow(buf, itemId); + long nextLink = io.removeRow(pageAddr, itemId, pageSize()); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new DataPageRemoveRecord(cacheId, page.id(), itemId)); @@ -200,7 +199,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // put(null, page, buf, REUSE_BUCKET); // } - int newFreeSpace = io.getFreeSpace(buf); + int newFreeSpace = io.getFreeSpace(pageAddr); if (newFreeSpace > MIN_PAGE_FREE_SPACE) { int newBucket = bucket(newFreeSpace, false); @@ -210,12 +209,12 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { if (oldBucket != newBucket) { // It is possible that page was concurrently taken for put, in this case put will handle bucket change. - if (removeDataPage(page, buf, io, oldBucket)) - put(null, page, buf, newBucket); + if (removeDataPage(page, pageAddr, io, oldBucket)) + put(null, page, pageAddr, newBucket); } } else - put(null, page, buf, newBucket); + put(null, page, pageAddr, newBucket); } // For common case boxed 0L will be cached inside of Long, so no garbage will be produced. @@ -326,7 +325,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // If it is an existing page, we do not need to initialize it. DataPageIO init = reuseBucket || pageId == 0L ? DataPageIO.VERSIONS.latest() : null; - written = writePage(page, this, writeRow, init, wal, row, written, FAIL_I); + written = writePage(pageMem, page, this, writeRow, init, wal, row, written, FAIL_I); assert written != FAIL_I; // We can't fail here. } @@ -344,7 +343,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { long nextLink; try (Page page = pageMem.page(cacheId, pageId)) { - nextLink = writePage(page, this, rmvRow, null, itemId, FAIL_L); + nextLink = writePage(pageMem, page, this, rmvRow, null, itemId, FAIL_L); assert nextLink != FAIL_L; // Can't fail here. } @@ -354,7 +353,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { pageId = PageIdUtils.pageId(nextLink); try (Page page = pageMem.page(cacheId, pageId)) { - nextLink = writePage(page, this, rmvRow, null, itemId, FAIL_L); + nextLink = writePage(pageMem, page, this, rmvRow, null, itemId, FAIL_L); assert nextLink != FAIL_L; // Can't fail here. } @@ -380,7 +379,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { @Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException { assert reuseList == this: "not allowed to be a reuse list"; - put(bag, null, null, REUSE_BUCKET); + put(bag, null, 0L, REUSE_BUCKET); } /** {@inheritDoc} */