Repository: ignite Updated Branches: refs/heads/ignite-11810 94f22a0c0 -> 6f1ed0daf
ignite-gg-11810 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f1ed0da Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f1ed0da Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f1ed0da Branch: refs/heads/ignite-11810 Commit: 6f1ed0daf83e363f259a3cfd3433f1bbc497aca6 Parents: 94f22a0 Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 23 13:28:01 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 23 13:28:01 2016 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/pagemem/Page.java | 4 + .../ignite/internal/pagemem/PageUtils.java | 12 + .../internal/pagemem/impl/PageNoStoreImpl.java | 10 + .../processors/cache/CacheObjectAdapter.java | 10 +- .../processors/cache/IncompleteObject.java | 13 ++ .../cache/database/CacheDataRowAdapter.java | 64 +++++- .../cache/database/DataStructure.java | 7 + .../cache/database/MetadataStorage.java | 115 ++++------ .../cache/database/freelist/FreeListImpl.java | 21 +- .../cache/database/freelist/PagesList.java | 2 +- .../database/freelist/io/PagesListMetaIO.java | 4 +- .../database/freelist/io/PagesListNodeIO.java | 4 +- .../cache/database/tree/BPlusTree.java | 18 +- .../cache/database/tree/io/BPlusIO.java | 10 +- .../cache/database/tree/io/BPlusInnerIO.java | 5 +- .../cache/database/tree/io/CacheVersionIO.java | 48 ++++ .../cache/database/tree/io/DataPageIO.java | 226 +++++++++++-------- .../cache/database/tree/io/DataPagePayload.java | 50 ++++ .../cache/database/tree/io/PageIO.java | 2 +- .../cache/database/tree/io/PageMetaIO.java | 4 +- .../database/tree/io/PagePartitionMetaIO.java | 4 +- 21 files changed, 413 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java index 89848cc..5f9d424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java @@ -95,6 +95,10 @@ public interface Page extends AutoCloseable { */ public Boolean fullPageWalRecordPolicy(); + public int size(); + + public ByteBuffer pageBuffer(); + /** * Release page. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java index 69719e1..d4ca73d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageUtils.java @@ -32,6 +32,18 @@ public class PageUtils { return unsafe.getByte(buf, off); } + public static byte[] getBytes(long buf, int off, int len) { + byte[] bytes = new byte[len]; + + unsafe.copyMemory(null, buf + off, bytes, GridUnsafe.BYTE_ARR_OFF, len); + + return bytes; + } + + public static void getBytes(long src, int srcOff, byte[] dst, int dstOff, int len) { + unsafe.copyMemory(null, src + srcOff, dst, GridUnsafe.BYTE_ARR_OFF + dstOff, len); + } + public static short getShort(long buf, int off) { return unsafe.getShort(buf, off); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java index 0ad206b..bbe2993 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java @@ -71,6 +71,11 @@ public class PageNoStoreImpl implements Page { buf = pageMem.wrapPointer(absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD, pageMem.pageSize()); } + /** {@inheritDoc} */ + @Override public ByteBuffer pageBuffer() { + return pageMem.wrapPointer(absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD, pageMem.pageSize()); + } + private long pointer() { return absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD; } @@ -175,6 +180,11 @@ public class PageNoStoreImpl implements Page { } /** {@inheritDoc} */ + @Override public int size() { + return pageMem.pageSize(); + } + + /** {@inheritDoc} */ @Override public void close() { pageMem.releasePage(this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java index a34e98d..1394fc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java @@ -164,8 +164,14 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable * @return {@code True} if data were successfully written. * @throws IgniteCheckedException If failed. */ - public static boolean putValue(byte cacheObjType, final ByteBuffer buf, int off, int len, - byte[] valBytes, final int start) throws IgniteCheckedException { + public static boolean putValue(byte cacheObjType, + final ByteBuffer buf, + int off, + int len, + byte[] valBytes, + final int start) + throws IgniteCheckedException + { int dataLen = valBytes.length; if (buf.remaining() < len) http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java index 666fc27..ce0e306 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IncompleteObject.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.nio.ByteBuffer; +import org.apache.ignite.internal.pagemem.PageUtils; /** * Incomplete object. @@ -86,4 +87,16 @@ public class IncompleteObject<T> { off += len; } + + public int readData(long buf, int remaining) { + assert data != null; + + final int len = Math.min(data.length - off, remaining); + + PageUtils.getBytes(buf, 0, data, off, len); + + off += len; + + return len; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java index b5babc4..59e2fc4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageIdUtils; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -29,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.IncompleteObject; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -68,7 +70,6 @@ public class CacheDataRowAdapter implements CacheDataRow { // Link can be 0 here. this.link = link; } - /** * Read row from data pages. * @@ -89,19 +90,21 @@ public class CacheDataRowAdapter implements CacheDataRow { do { try (Page page = page(pageId(nextLink), cctx)) { - ByteBuffer buf = page.getForRead(); // Non-empty data page must not be recycled. + long buf = page.getForReadPointer(); // Non-empty data page must not be recycled. - assert buf != null: nextLink; + assert buf != 0L : nextLink; try { DataPageIO io = DataPageIO.VERSIONS.forPage(buf); - nextLink = io.setPositionAndLimitOnPayload(buf, itemId(nextLink)); + DataPagePayload data = io.readPayload(buf, itemId(nextLink), page.size()); + + nextLink = data.nextLink(); if (first) { if (nextLink == 0) { // Fast path for a single page row. - readFullRow(coctx, buf, keyOnly); + readFullRow(coctx, data.offset(), keyOnly); return; } @@ -109,7 +112,12 @@ public class CacheDataRowAdapter implements CacheDataRow { first = false; } - incomplete = readFragment(coctx, buf, keyOnly, incomplete); + ByteBuffer buf0 = page.pageBuffer(); + + buf0.position(data.offset()); + buf0.limit(data.offset() + data.payloadSize()); + + incomplete = readFragment(coctx, buf0, keyOnly, incomplete); if (keyOnly && key != null) return; @@ -121,7 +129,7 @@ public class CacheDataRowAdapter implements CacheDataRow { } while(nextLink != 0); - assert isReady(): "ready"; + assert isReady() : "ready"; } /** @@ -179,8 +187,23 @@ public class CacheDataRowAdapter implements CacheDataRow { * @param keyOnly {@code true} If need to read only key object. * @throws IgniteCheckedException If failed. */ - private void readFullRow(CacheObjectContext coctx, ByteBuffer buf, boolean keyOnly) throws IgniteCheckedException { - key = coctx.processor().toKeyCacheObject(coctx, buf); + private void readFullRow(CacheObjectContext coctx, long buf, boolean keyOnly) throws IgniteCheckedException { + int off = 0; + + int len = PageUtils.getInt(buf, off); + off += 4; + + if (len == 0) + key = null; + else { + byte[] bytes = PageUtils.getBytes(buf, off, len); + off += len; + + byte type = PageUtils.getByte(buf, off); + off++; + + key = coctx.processor().toKeyCacheObject(coctx, type, bytes); + } if (keyOnly) { assert key != null: "key"; @@ -188,9 +211,26 @@ public class CacheDataRowAdapter implements CacheDataRow { return; } - val = coctx.processor().toCacheObject(coctx, buf); - ver = CacheVersionIO.read(buf, false); - expireTime = buf.getLong(); + len = PageUtils.getInt(buf, off); + off += 4; + + if (len == 0) + val = null; + else { + byte[] bytes = PageUtils.getBytes(buf, off, len); + off += len; + + byte type = PageUtils.getByte(buf, off); + off++; + + val = coctx.processor().toCacheObject(coctx, type, bytes); + } + + ver = CacheVersionIO.read(buf + off, false); + + off += CacheVersionIO.size(ver, false); + + expireTime = PageUtils.getLong(buf, off); assert isReady(): "ready"; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java index 26df029..ce5422f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java @@ -171,6 +171,13 @@ public abstract class DataStructure implements PageLockListener { PageHandler.readUnlock(page, buf, this); } + /** + * @return Page size. + */ + protected final int pageSize() { + return pageMem.pageSize(); + } + /** {@inheritDoc} */ @Override public void onBeforeWriteLock(Page page) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java index 26151ac..b19d03d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.cache.database; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusIO; @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusInnerIO import org.apache.ignite.internal.processors.cache.database.tree.io.BPlusLeafIO; import org.apache.ignite.internal.processors.cache.database.tree.io.IOVersions; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -190,19 +191,19 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override protected int compare(final BPlusIO<IndexItem> io, final ByteBuffer buf, final int idx, + @Override protected int compare(final BPlusIO<IndexItem> io, final long buf, final int idx, final IndexItem row) throws IgniteCheckedException { final int off = ((IndexIO)io).getOffset(idx); int shift = 0; // Compare index names. - final byte len = buf.get(off + shift); + final byte len = PageUtils.getByte(off, shift); shift += BYTE_LEN; for (int i = 0; i < len && i < row.idxName.length; i++) { - final int cmp = Byte.compare(buf.get(off + i + shift), row.idxName[i]); + final int cmp = Byte.compare(PageUtils.getByte(off, i + shift), row.idxName[i]); if (cmp != 0) return cmp; @@ -212,7 +213,7 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final ByteBuffer buf, + @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long buf, final int idx) throws IgniteCheckedException { return readRow(buf, ((IndexIO)io).getOffset(idx)); } @@ -251,27 +252,18 @@ public class MetadataStorage implements MetaStore { * @param row Row to store. */ private static void storeRow( - final ByteBuffer buf, + final long buf, final int off, final IndexItem row ) { - int origPos = buf.position(); + // Index name length. + PageUtils.putByte(buf, off, (byte)row.idxName.length); - try { - buf.position(off); - - // Index name length. - buf.put((byte)row.idxName.length); - - // Index name. - buf.put(row.idxName); + // Index name. + PageUtils.putBytes(buf, off + 1, row.idxName); - // Page ID. - buf.putLong(row.pageId); - } - finally { - buf.position(origPos); - } + // Page ID. + PageUtils.putLong(buf, off + 1 + row.idxName.length, row.pageId); } /** @@ -283,39 +275,20 @@ public class MetadataStorage implements MetaStore { * @param srcOff Src buf offset. */ private static void storeRow( - final ByteBuffer dst, + final long dst, final int dstOff, - final ByteBuffer src, + final long src, final int srcOff ) { - int srcOrigPos = src.position(); - int dstOrigPos = dst.position(); - - try { - src.position(srcOff); - dst.position(dstOff); - - // Index name length. - final byte len = src.get(); - - dst.put(len); - - int lim = src.limit(); + // Index name length. + final byte len = PageUtils.getByte(src, srcOff); - src.limit(src.position() + len); + PageUtils.putByte(dst, dstOff, len); - // Index name. - dst.put(src); + PageHandler.copyMemory(src, srcOff + 1, dst, dstOff + 1, len); - src.limit(lim); - - // Page ID. - dst.putLong(src.getLong()); - } - finally { - src.position(srcOrigPos); - dst.position(dstOrigPos); - } + // Page ID. + PageUtils.putLong(dst, dstOff + 1 + len, PageUtils.getLong(src, srcOff + 1 + len)); } /** @@ -325,28 +298,17 @@ public class MetadataStorage implements MetaStore { * @param off Offset in buf. * @return Read row. */ - private static IndexItem readRow(final ByteBuffer buf, final int off) { - int origOff = buf.position(); - - try { - buf.position(off); + private static IndexItem readRow(final long buf, final int off) { + // Index name length. + final int len = PageUtils.getByte(buf, 0) & 0xFF; - // Index name length. - final int len = buf.get() & 0xFF; + // Index name. + final byte[] idxName = PageUtils.getBytes(buf, 1, len); - // Index name. - final byte[] idxName = new byte[len]; + // Page ID. + final long pageId = PageUtils.getLong(buf, off + 1 + len); - buf.get(idxName); - - // Page ID. - final long pageId = buf.getLong(); - - return new IndexItem(idxName, pageId); - } - finally { - buf.position(origOff); - } + return new IndexItem(idxName, pageId); } /** @@ -378,19 +340,19 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override public void storeByOffset(ByteBuffer buf, int off, IndexItem row) throws IgniteCheckedException { + @Override public void storeByOffset(long buf, int off, IndexItem row) throws IgniteCheckedException { storeRow(buf, off, row); } /** {@inheritDoc} */ - @Override public void store(final ByteBuffer dst, final int dstIdx, final BPlusIO<IndexItem> srcIo, - final ByteBuffer src, + @Override public void store(final long dst, final int dstIdx, final BPlusIO<IndexItem> srcIo, + final long src, final int srcIdx) throws IgniteCheckedException { storeRow(dst, offset(dstIdx), src, ((IndexIO)srcIo).getOffset(srcIdx)); } /** {@inheritDoc} */ - @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final ByteBuffer buf, + @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final long buf, final int idx) throws IgniteCheckedException { return readRow(buf, offset(idx)); } @@ -419,19 +381,22 @@ public class MetadataStorage implements MetaStore { } /** {@inheritDoc} */ - @Override public void storeByOffset(ByteBuffer buf, int off, IndexItem row) throws IgniteCheckedException { + @Override public void storeByOffset(long buf, int off, IndexItem row) throws IgniteCheckedException { storeRow(buf, off, row); } /** {@inheritDoc} */ - @Override public void store(final ByteBuffer dst, final int dstIdx, final BPlusIO<IndexItem> srcIo, - final ByteBuffer src, + @Override public void store(final long dst, + final int dstIdx, + final BPlusIO<IndexItem> srcIo, + final long src, final int srcIdx) throws IgniteCheckedException { storeRow(dst, offset(dstIdx), src, ((IndexIO)srcIo).getOffset(srcIdx)); } /** {@inheritDoc} */ - @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, final ByteBuffer buf, + @Override public IndexItem getLookupRow(final BPlusTree<IndexItem, ?> tree, + final long buf, final int idx) throws IgniteCheckedException { return readRow(buf, offset(idx)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java index 1d2524d..23f50f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java @@ -17,13 +17,13 @@ package org.apache.ignite.internal.processors.cache.database.freelist; -import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReferenceArray; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.Page; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord; @@ -31,6 +31,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO; import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.database.tree.io.DataPagePayload; import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseBag; import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList; @@ -114,19 +115,17 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { CacheDataRow row, int rowSize ) throws IgniteCheckedException { - // TODO: context parameter. - io.addRow(buf, row, rowSize); + io.addRow(buf, row, rowSize, pageSize()); if (isWalDeltaRecordNeeded(wal, page)) { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[rowSize]; - io.setPositionAndLimitOnPayload(buf, PageIdUtils.itemId(row.link())); + DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); - assert buf.remaining() == rowSize; + assert data.payloadSize() == rowSize; - buf.get(payload); - buf.position(0); + PageUtils.getBytes(buf, data.offset(), payload, 0, rowSize); wal.log(new DataPageInsertRecord( cacheId, @@ -166,9 +165,9 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { // TODO This record must contain only a reference to a logical WAL record with the actual data. byte[] payload = new byte[payloadSize]; - io.setPositionAndLimitOnPayload(buf, PageIdUtils.itemId(row.link())); - buf.get(payload); - buf.position(0); + DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize()); + + PageUtils.getBytes(buf, data.offset(), payload, 0, payloadSize); wal.log(new DataPageInsertFragmentRecord(cacheId, page.id(), payload, lastLink)); } @@ -187,7 +186,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList { assert oldFreeSpace >= 0: oldFreeSpace; - long nextLink = io.removeRow(buf, itemId); + long nextLink = io.removeRow(buf, itemId, pageSize()); if (isWalDeltaRecordNeeded(wal, page)) wal.log(new DataPageRemoveRecord(cacheId, page.id(), itemId)); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java index 8311b65..f925daa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/PagesList.java @@ -884,7 +884,7 @@ public abstract class PagesList extends DataStructure { PageIO initIo = initIoVers.latest(); - initIo.initNewPage(tailBuf, tailId); + initIo.initNewPage(tailBuf, tailId, pageSize()); if (isWalDeltaRecordNeeded(wal, tail)) { wal.log(new InitNewPageRecord(cacheId, tail.id(), initIo.getType(), http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListMetaIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListMetaIO.java index 6ac8cde..67e2667 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListMetaIO.java @@ -53,8 +53,8 @@ public class PagesListMetaIO extends PageIO { } /** {@inheritDoc} */ - @Override public void initNewPage(long buf, long pageId) { - super.initNewPage(buf, pageId); + @Override public void initNewPage(long buf, long pageId, int pageSize) { + super.initNewPage(buf, pageId, pageSize); setCount(buf, 0); setNextMetaPageId(buf, 0L); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListNodeIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListNodeIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListNodeIO.java index c92e4a9..30f47d6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListNodeIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/io/PagesListNodeIO.java @@ -52,8 +52,8 @@ public class PagesListNodeIO extends PageIO { } /** {@inheritDoc} */ - @Override public void initNewPage(long buf, long pageId) { - super.initNewPage(buf, pageId); + @Override public void initNewPage(long buf, long pageId, int pageSize) { + super.initNewPage(buf, pageId, pageSize); setEmpty(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java index b8a1004..a92f39a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java @@ -408,7 +408,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { // We may need to replace inner key or want to merge this leaf with sibling after the remove -> keep lock. if (needReplaceInner || // We need to make sure that we have back or forward to be able to merge. - ((r.fwdId != 0 || r.backId != 0) && mayMerge(cnt - 1, io.getMaxCount(pageMem.pageSize(), buf)))) { + ((r.fwdId != 0 || r.backId != 0) && mayMerge(cnt - 1, io.getMaxCount(pageSize(), buf)))) { // If we have backId then we've already locked back page, nothing to do here. if (r.fwdId != 0 && r.backId == 0) { Result res = r.lockForward(0); @@ -527,7 +527,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { assert lvl == io.getRootLevel(buf); // Can drop only root. - io.cutRoot(pageMem.pageSize(), buf); + io.cutRoot(pageSize(), buf); if (needWalDeltaRecord(meta)) wal.log(new MetaPageCutRootRecord(cacheId, meta.id())); @@ -547,7 +547,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { assert lvl == io.getLevelsCount(buf); - io.addRoot(pageMem.pageSize(), buf, rootPageId); + io.addRoot(pageSize(), buf, rootPageId); if (needWalDeltaRecord(meta)) wal.log(new MetaPageAddRootRecord(cacheId, meta.id(), rootPageId)); @@ -565,7 +565,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { // Safe cast because we should never recycle meta page until the tree is destroyed. BPlusMetaIO io = (BPlusMetaIO)iox; - io.initRoot(pageMem.pageSize(), buf, rootId); + io.initRoot(pageSize(), buf, rootId); if (needWalDeltaRecord(meta)) wal.log(new MetaPageInitRootRecord(cacheId, meta.id(), rootId)); @@ -1721,7 +1721,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { } // Update forward page. - io.splitForwardPage(buf, fwdId, fwdBuf, mid, cnt); + io.splitForwardPage(buf, fwdId, fwdBuf, mid, cnt, pageSize()); // TODO GG-11640 log a correct forward page record. fwd.fullPageWalRecordPolicy(Boolean.TRUE); @@ -2184,7 +2184,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { */ private L insert(Page page, BPlusIO<L> io, long buf, int idx, int lvl) throws IgniteCheckedException { - int maxCnt = io.getMaxCount(pageMem.pageSize(), buf); + int maxCnt = io.getMaxCount(pageSize(), buf); int cnt = io.getCount(buf); if (cnt == maxCnt) // Need to split page. @@ -2284,7 +2284,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { long pageId = PageIO.getPageId(buf); - inner(io).initNewRoot(newRootBuf, newRootId, pageId, moveUpRow, null, fwdId); + inner(io).initNewRoot(newRootBuf, newRootId, pageId, moveUpRow, null, fwdId, pageSize()); if (needWalDeltaRecord(newRoot)) wal.log(new NewRootInitRecord<>(cacheId, newRoot.id(), newRootId, @@ -2624,7 +2624,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { // Exit: we are done. } else if (tail.sibling != null && - tail.getCount() + tail.sibling.getCount() < tail.io.getMaxCount(pageMem.pageSize(), tail.buf)) { + tail.getCount() + tail.sibling.getCount() < tail.io.getMaxCount(pageSize(), tail.buf)) { // Release everything lower than tail, we've already merged this path. doReleaseTail(tail.down); tail.down = null; @@ -2939,7 +2939,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure { boolean emptyBranch = needMergeEmptyBranch == TRUE || needMergeEmptyBranch == READY; - if (!left.io.merge(pageMem.pageSize(), prnt.io, prnt.buf, prntIdx, left.buf, right.buf, emptyBranch)) + if (!left.io.merge(pageSize(), prnt.io, prnt.buf, prntIdx, left.buf, right.buf, emptyBranch)) return false; // Invalidate indexes after successful merge. http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java index 6c739a5..7ebd2f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java @@ -72,8 +72,8 @@ public abstract class BPlusIO<L> extends PageIO { } /** {@inheritDoc} */ - @Override public void initNewPage(long buf, long pageId) { - super.initNewPage(buf, pageId); + @Override public void initNewPage(long buf, long pageId, int pageSize) { + super.initNewPage(buf, pageId, pageSize); setCount(buf, 0); setForward(buf, 0); @@ -263,6 +263,7 @@ public abstract class BPlusIO<L> extends PageIO { * @param fwdBuf Forward buffer. * @param mid Bisection index. * @param cnt Initial elements count in the page being split. + * @param pageSize Page size. * @throws IgniteCheckedException If failed. */ public void splitForwardPage( @@ -270,9 +271,10 @@ public abstract class BPlusIO<L> extends PageIO { long fwdId, long fwdBuf, int mid, - int cnt + int cnt, + int pageSize ) throws IgniteCheckedException { - initNewPage(fwdBuf, fwdId); + initNewPage(fwdBuf, fwdId, pageSize); cnt -= mid; http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java index d0ead51..ebf7522 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java @@ -157,9 +157,10 @@ public abstract class BPlusInnerIO<L> extends BPlusIO<L> { long leftChildId, L row, byte[] rowBytes, - long rightChildId + long rightChildId, + int pageSize ) throws IgniteCheckedException { - initNewPage(newRootBuf, newRootId); + initNewPage(newRootBuf, newRootId, pageSize); setCount(newRootBuf, 1); setLeft(newRootBuf, 0, leftChildId); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java index ed75d12..db36ce2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.database.tree.io; +import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -102,6 +103,53 @@ public class CacheVersionIO { * @return Size of serialized version. * @throws IgniteCheckedException If failed. */ + public static int readSize(ByteBuffer buf, boolean allowNull) throws IgniteCheckedException { + byte protoVer = checkProtocolVersion(buf.get(buf.position()), allowNull); + + switch (protoVer) { + case NULL_PROTO_VER: + return NULL_SIZE; + + case 1: + return SIZE_V1; + + default: + throw new IllegalStateException(); + } + } + + /** + * Reads GridCacheVersion instance from the given buffer. Moves buffer's position by the number of used + * bytes. + * + * @param buf Byte buffer. + * @param allowNull Is {@code null} version allowed. + * @return Version. + * @throws IgniteCheckedException If failed. + */ + public static GridCacheVersion read(ByteBuffer buf, boolean allowNull) throws IgniteCheckedException { + byte protoVer = checkProtocolVersion(buf.get(), allowNull); + + if (protoVer == NULL_PROTO_VER) + return null; + + int topVer = buf.getInt(); + int nodeOrderDrId = buf.getInt(); + long globalTime = buf.getLong(); + long order = buf.getLong(); + + return new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order); + } + + /** + * Gets needed buffer size to read the whole version instance. + * Does not change buffer position. + * + * @param buf Buffer. + * @param allowNull Is {@code null} version allowed. + * @return Size of serialized version. + * @throws IgniteCheckedException If failed. + */ public static int readSize(long buf, boolean allowNull) throws IgniteCheckedException { byte protoVer = checkProtocolVersion(PageUtils.getByte(buf, 0), allowNull); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java index ca8ea39..31fb19b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.database.tree.io; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -88,21 +89,22 @@ public class DataPageIO extends PageIO { } /** {@inheritDoc} */ - @Override public void initNewPage(long buf, long pageId) { - super.initNewPage(buf, pageId); + @Override public void initNewPage(long buf, long pageId, int pageSize) { + super.initNewPage(buf, pageId, pageSize); - setEmptyPage(buf); + setEmptyPage(buf, pageSize); setFreeListPageId(buf, 0L); } /** * @param buf Buffer. + * @param pageSize Page size. */ - private void setEmptyPage(long buf) { + private void setEmptyPage(long buf, int pageSize) { setDirectCount(buf, 0); setIndirectCount(buf, 0); - setFirstEntryOffset(buf, buf.capacity()); - setRealFreeSpace(buf, buf.capacity() - ITEMS_OFF); + setFirstEntryOffset(buf, pageSize, pageSize); + setRealFreeSpace(buf, pageSize - ITEMS_OFF, pageSize); } /** @@ -163,9 +165,10 @@ public class DataPageIO extends PageIO { /** * @param buf Buffer. * @param dataOff Entry data offset. + * @param pageSize Page size. */ - private void setFirstEntryOffset(long buf, int dataOff) { - assert dataOff >= ITEMS_OFF + ITEM_SIZE && dataOff <= buf.capacity(): dataOff; + private void setFirstEntryOffset(long buf, int dataOff, int pageSize) { + assert dataOff >= ITEMS_OFF + ITEM_SIZE && dataOff <= pageSize : dataOff; PageUtils.putShort(buf, FIRST_ENTRY_OFF, (short)dataOff); } @@ -182,8 +185,8 @@ public class DataPageIO extends PageIO { * @param buf Buffer. * @param freeSpace Free space. */ - private void setRealFreeSpace(long buf, int freeSpace) { - assert freeSpace == actualFreeSpace(buf): freeSpace + " != " + actualFreeSpace(buf); + private void setRealFreeSpace(long buf, int freeSpace, int pageSize) { + assert freeSpace == actualFreeSpace(buf, pageSize): freeSpace + " != " + actualFreeSpace(buf, pageSize); PageUtils.putShort(buf, FREE_SPACE_OFF, (short)freeSpace); } @@ -218,7 +221,7 @@ public class DataPageIO extends PageIO { } /** - * Equivalent for {@link #actualFreeSpace(long)} but reads saved value. + * Equivalent for {@link #actualFreeSpace(long, int)} but reads saved value. * * @param buf Buffer. * @return Free space. @@ -316,9 +319,10 @@ public class DataPageIO extends PageIO { /** * @param buf Buffer. + * @param pageSize Page size. * @return String representation. */ - public String printPageLayout(long buf) { + public String printPageLayout(long buf, int pageSize) { int directCnt = getDirectCount(buf); int indirectCnt = getIndirectCount(buf); int free = getRealFreeSpace(buf); @@ -337,7 +341,7 @@ public class DataPageIO extends PageIO { short item = getItem(buf, i); - if (item < ITEMS_OFF || item >= buf.capacity()) + if (item < ITEMS_OFF || item >= pageSize) valid = false; entriesSize += getPageEntrySize(buf, item, SHOW_PAYLOAD_LEN | SHOW_LINK); @@ -375,7 +379,7 @@ public class DataPageIO extends PageIO { b.a("][free=").a(free); - int actualFree = buf.capacity() - ITEMS_OFF - (entriesSize + (directCnt + indirectCnt) * ITEM_SIZE); + int actualFree = pageSize - ITEMS_OFF - (entriesSize + (directCnt + indirectCnt) * ITEM_SIZE); if (free != actualFree) { b.a(", actualFree=").a(actualFree); @@ -395,19 +399,19 @@ public class DataPageIO extends PageIO { * @param itemId Fixed item ID (the index used for referencing an entry from the outside). * @return Data entry offset in bytes. */ - private int getDataOffset(long buf, int itemId) { + private int getDataOffset(long buf, int itemId, int pageSize) { assert checkIndex(itemId): itemId; int directCnt = getDirectCount(buf); - assert directCnt > 0: "itemId=" + itemId + ", directCnt=" + directCnt + ", page=" + printPageLayout(buf); + assert directCnt > 0: "itemId=" + itemId + ", directCnt=" + directCnt + ", page=" + printPageLayout(buf, pageSize); if (itemId >= directCnt) { // Need to do indirect lookup. int indirectCnt = getIndirectCount(buf); // Must have indirect items here. assert indirectCnt > 0: "itemId=" + itemId + ", directCnt=" + directCnt + ", indirectCnt=" + indirectCnt + - ", page=" + printPageLayout(buf); + ", page=" + printPageLayout(buf, pageSize); int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt); @@ -449,18 +453,16 @@ public class DataPageIO extends PageIO { * @param itemId Item to position on. * @return Link to the next fragment or {@code 0} if it is the last fragment or the data row is not fragmented. */ - public long setPositionAndLimitOnPayload(final long buf, final int itemId) { - int dataOff = getDataOffset(buf, itemId); + public DataPagePayload readPayload(final long buf, final int itemId, final int pageSize) { + int dataOff = getDataOffset(buf, itemId, pageSize); boolean fragmented = isFragmented(buf, dataOff); long nextLink = fragmented ? getNextFragmentLink(buf, dataOff) : 0; int payloadSize = getPageEntrySize(buf, dataOff, 0); - buf.position(dataOff + PAYLOAD_LEN_SIZE + (fragmented ? LINK_SIZE : 0)); - - buf.limit(buf.position() + payloadSize); - - return nextLink; + return new DataPagePayload(dataOff + PAYLOAD_LEN_SIZE + (fragmented ? LINK_SIZE : 0), + payloadSize, + nextLink); } /** @@ -593,13 +595,14 @@ public class DataPageIO extends PageIO { /** * @param buf Buffer. * @param itemId Fixed item ID (the index used for referencing an entry from the outside). + * @param pageSize Page size. * @return Next link for fragmented entries or {@code 0} if none. * @throws IgniteCheckedException If failed. */ - public long removeRow(long buf, int itemId) throws IgniteCheckedException { + public long removeRow(long buf, int itemId, int pageSize) throws IgniteCheckedException { assert checkIndex(itemId) : itemId; - final int dataOff = getDataOffset(buf, itemId); + final int dataOff = getDataOffset(buf, itemId, pageSize); final long nextLink = isFragmented(buf, dataOff) ? getNextFragmentLink(buf, dataOff) : 0; // Record original counts to calculate delta in free space in the end of remove. @@ -615,7 +618,7 @@ public class DataPageIO extends PageIO { assert (indirectCnt == 0 && itemId == 0) || (indirectCnt == 1 && itemId == itemId(getItem(buf, 1))) : itemId; - setEmptyPage(buf); + setEmptyPage(buf, pageSize); } else { // Get the entry size before the actual remove. @@ -642,15 +645,15 @@ public class DataPageIO extends PageIO { if (indirectId == 0) {// For the last direct item with no indirect item. if (dropLast) - moveItems(buf, directCnt, indirectCnt, -1); + moveItems(buf, directCnt, indirectCnt, -1, pageSize); else curIndirectCnt++; } else { if (dropLast) - moveItems(buf, directCnt, indirectId - directCnt, -1); + moveItems(buf, directCnt, indirectId - directCnt, -1, pageSize); - moveItems(buf, indirectId + 1, directCnt + indirectCnt - indirectId - 1, dropLast ? -2 : -1); + moveItems(buf, indirectId + 1, directCnt + indirectCnt - indirectId - 1, dropLast ? -2 : -1, pageSize); if (dropLast) curIndirectCnt--; @@ -662,8 +665,9 @@ public class DataPageIO extends PageIO { assert getIndirectCount(buf) <= getDirectCount(buf); // Increase free space. - setRealFreeSpace(buf, getRealFreeSpace(buf) + rmvEntrySize + - ITEM_SIZE * (directCnt - getDirectCount(buf) + indirectCnt - getIndirectCount(buf))); + setRealFreeSpace(buf, + getRealFreeSpace(buf) + rmvEntrySize + ITEM_SIZE * (directCnt - getDirectCount(buf) + indirectCnt - getIndirectCount(buf)), + pageSize); } return nextLink; @@ -674,12 +678,13 @@ public class DataPageIO extends PageIO { * @param idx Index. * @param cnt Count. * @param step Step. + * @param pageSize Page size. */ - private void moveItems(long buf, int idx, int cnt, int step) { + private void moveItems(long buf, int idx, int cnt, int step, int pageSize) { assert cnt >= 0: cnt; if (cnt != 0) - moveBytes(buf, itemOffset(idx), cnt * ITEM_SIZE, step * ITEM_SIZE); + moveBytes(buf, itemOffset(idx), cnt * ITEM_SIZE, step * ITEM_SIZE, pageSize); } /** @@ -699,12 +704,14 @@ public class DataPageIO extends PageIO { * @param buf Buffer. * @param row Cache data row. * @param rowSize Row size. + * @param pageSize Page size. * @throws IgniteCheckedException If failed. */ public void addRow( long buf, CacheDataRow row, - int rowSize + int rowSize, + int pageSize ) throws IgniteCheckedException { assert rowSize <= getFreeSpace(buf): "can't call addRow if not enough space for the whole row"; @@ -713,11 +720,11 @@ public class DataPageIO extends PageIO { int directCnt = getDirectCount(buf); int indirectCnt = getIndirectCount(buf); - int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt); + int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt, pageSize); writeRowData(buf, dataOff, rowSize, row); - int itemId = addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff); + int itemId = addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); setLink(row, buf, itemId); } @@ -726,11 +733,14 @@ public class DataPageIO extends PageIO { * Adds row to this data page and sets respective link to the given row object. * * @param buf Buffer. + * @param payload Payload. + * @param pageSize Page size. * @throws IgniteCheckedException If failed. */ public void addRow( long buf, - byte[] payload + byte[] payload, + int pageSize ) throws IgniteCheckedException { assert payload.length <= getFreeSpace(buf): "can't call addRow if not enough space for the whole row"; @@ -739,11 +749,11 @@ public class DataPageIO extends PageIO { int directCnt = getDirectCount(buf); int indirectCnt = getIndirectCount(buf); - int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt); + int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt, pageSize); writeRowData(buf, dataOff, payload); - addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff); + addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); } /** @@ -752,6 +762,7 @@ public class DataPageIO extends PageIO { * @param directCnt Direct items count. * @param indirectCnt Indirect items count. * @param dataOff First entry offset. + * @param pageSize Page size. * @return First entry offset after compaction. */ private int compactIfNeed( @@ -759,10 +770,11 @@ public class DataPageIO extends PageIO { final int entryFullSize, final int directCnt, final int indirectCnt, - int dataOff + int dataOff, + int pageSize ) { if (!isEnoughSpace(entryFullSize, dataOff, directCnt, indirectCnt)) { - dataOff = compactDataEntries(buf, directCnt); + dataOff = compactDataEntries(buf, directCnt, pageSize); assert isEnoughSpace(entryFullSize, dataOff, directCnt, indirectCnt); } @@ -778,20 +790,27 @@ public class DataPageIO extends PageIO { * @param directCnt Direct items count. * @param indirectCnt Indirect items count. * @param dataOff Data offset. + * @param pageSize Page size. * @return Item ID. */ - private int addItem(final long buf, final int fullEntrySize, final int directCnt, - final int indirectCnt, final int dataOff) { - setFirstEntryOffset(buf, dataOff); + private int addItem(final long buf, + final int fullEntrySize, + final int directCnt, + final int indirectCnt, + final int dataOff, + final int pageSize) + { + setFirstEntryOffset(buf, dataOff, pageSize); - int itemId = insertItem(buf, dataOff, directCnt, indirectCnt); + int itemId = insertItem(buf, dataOff, directCnt, indirectCnt, pageSize); assert checkIndex(itemId): itemId; assert getIndirectCount(buf) <= getDirectCount(buf); // Update free space. If number of indirect items changed, then we were able to reuse an item slot. - setRealFreeSpace(buf, getRealFreeSpace(buf) - fullEntrySize + - (getIndirectCount(buf) != indirectCnt ? ITEM_SIZE : 0)); + setRealFreeSpace(buf, + getRealFreeSpace(buf) - fullEntrySize + (getIndirectCount(buf) != indirectCnt ? ITEM_SIZE : 0), + pageSize); return itemId; } @@ -801,13 +820,14 @@ public class DataPageIO extends PageIO { * @param fullEntrySize Full entry size. * @param directCnt Direct items count. * @param indirectCnt Indirect items count. + * @param pageSize Page size. * @return Offset in the buffer where the entry must be written. */ - private int getDataOffsetForWrite(long buf, int fullEntrySize, int directCnt, int indirectCnt) { + private int getDataOffsetForWrite(long buf, int fullEntrySize, int directCnt, int indirectCnt, int pageSize) { int dataOff = getFirstEntryOffset(buf); // Compact if we do not have enough space for entry. - dataOff = compactIfNeed(buf, fullEntrySize, directCnt, indirectCnt, dataOff); + dataOff = compactIfNeed(buf, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); // We will write data right before the first entry. dataOff -= fullEntrySize - ITEM_SIZE; @@ -826,7 +846,7 @@ public class DataPageIO extends PageIO { * @throws IgniteCheckedException If failed. */ public int addRowFragment( - long buf, + ByteBuffer buf, CacheDataRow row, int written, int rowSize @@ -863,12 +883,13 @@ public class DataPageIO extends PageIO { * @throws IgniteCheckedException If failed. */ private int addRowFragment( - long buf, + ByteBuffer buf, int written, int rowSize, long lastLink, CacheDataRow row, - byte[] payload + byte[] payload, + int pageSize ) throws IgniteCheckedException { assert payload == null ^ row == null; @@ -881,18 +902,25 @@ public class DataPageIO extends PageIO { int fullEntrySize = getPageEntrySize(payloadSize, SHOW_PAYLOAD_LEN | SHOW_LINK | SHOW_ITEM); int dataOff = getDataOffsetForWrite(buf, fullEntrySize, directCnt, indirectCnt); - PageUtils.putShort(buf, dataOff, (short)(payloadSize | FRAGMENTED_FLAG)); - PageUtils.putLong(buf, dataOff + 2, lastLink); + try { + buf.position(dataOff); + + buf.putShort((short) (payloadSize | FRAGMENTED_FLAG)); + buf.putLong(lastLink); - if (payload == null) { - int rowOff = rowSize - written - payloadSize; + if (payload == null) { + int rowOff = rowSize - written - payloadSize; - writeFragmentData(row, buf + dataOff + 10, rowOff, payloadSize); + writeFragmentData(row, buf, rowOff, payloadSize); + } + else + buf.put(payload); + } + finally { + buf.position(0); } - else - PageUtils.putBytes(buf, dataOff + 10, payload); - int itemId = addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff); + int itemId = addItem(buf, fullEntrySize, directCnt, indirectCnt, dataOff, pageSize); if (row != null) setLink(row, buf, itemId); @@ -920,7 +948,7 @@ public class DataPageIO extends PageIO { */ private void writeFragmentData( final CacheDataRow row, - final long buf, + final ByteBuffer buf, final int rowOff, final int payloadSize ) throws IgniteCheckedException { @@ -946,7 +974,7 @@ public class DataPageIO extends PageIO { */ private int writeFragment( final CacheDataRow row, - final long buf, + final ByteBuffer buf, final int rowOff, final int payloadSize, final EntryPart type, @@ -1080,9 +1108,10 @@ public class DataPageIO extends PageIO { * @param dataOff Data offset. * @param directCnt Direct items count. * @param indirectCnt Indirect items count. + * @param pageSize Page size. * @return Item ID (insertion index). */ - private int insertItem(long buf, int dataOff, int directCnt, int indirectCnt) { + private int insertItem(long buf, int dataOff, int directCnt, int indirectCnt, int pageSize) { if (indirectCnt > 0) { // If the first indirect item is on correct place to become the last direct item, do the transition // and insert the new item into the free slot which was referenced by this first indirect item. @@ -1102,7 +1131,7 @@ public class DataPageIO extends PageIO { } // Move all the indirect items forward to make a free slot and insert new item at the end of direct items. - moveItems(buf, directCnt, indirectCnt, +1); + moveItems(buf, directCnt, indirectCnt, +1, pageSize); setItem(buf, directCnt, directItemFromOffset(dataOff)); @@ -1115,9 +1144,10 @@ public class DataPageIO extends PageIO { /** * @param buf Buffer. * @param directCnt Direct items count. + * @param pageSize Page size. * @return New first entry offset. */ - private int compactDataEntries(long buf, int directCnt) { + private int compactDataEntries(long buf, int directCnt, int pageSize) { assert checkCount(directCnt): directCnt; int[] offs = new int[directCnt]; @@ -1131,7 +1161,7 @@ public class DataPageIO extends PageIO { Arrays.sort(offs); // Move right all of the entries if possible to make the page as compact as possible to its tail. - int prevOff = buf.capacity(); + int prevOff = pageSize; for (int i = directCnt - 1; i >= 0; i--) { int off = offs[i] >>> 8; @@ -1145,7 +1175,7 @@ public class DataPageIO extends PageIO { if (delta != 0) { // Move right. assert delta > 0: delta; - moveBytes(buf, off, entrySize, delta); + moveBytes(buf, off, entrySize, delta, pageSize); int itemId = offs[i] & 0xFF; @@ -1164,9 +1194,10 @@ public class DataPageIO extends PageIO { * Full-scan free space calculation procedure. * * @param buf Buffer to scan. + * @param pageSize Page size. * @return Actual free space in the buffer. */ - private int actualFreeSpace(long buf) { + private int actualFreeSpace(long buf, int pageSize) { int directCnt = getDirectCount(buf); int entriesSize = 0; @@ -1179,7 +1210,7 @@ public class DataPageIO extends PageIO { entriesSize += entrySize; } - return buf.capacity() - ITEMS_OFF - entriesSize - (directCnt + getIndirectCount(buf)) * ITEM_SIZE; + return pageSize - ITEMS_OFF - entriesSize - (directCnt + getIndirectCount(buf)) * ITEM_SIZE; } /** @@ -1187,12 +1218,13 @@ public class DataPageIO extends PageIO { * @param off Offset. * @param cnt Count. * @param step Step. + * @param pageSize Page size. */ - private void moveBytes(long buf, int off, int cnt, int step) { + private void moveBytes(long buf, int off, int cnt, int step, int pageSize) { assert step != 0: step; assert off + step >= 0; - assert off + step + cnt <= buf.capacity() : "[off=" + off + ", step=" + step + ", cnt=" + cnt + - ", cap=" + buf.capacity() + ']'; + assert off + step + cnt <= pageSize : "[off=" + off + ", step=" + step + ", cnt=" + cnt + + ", cap=" + pageSize + ']'; PageHandler.copyMemory(buf, buf, off, off + step, cnt); } @@ -1210,26 +1242,36 @@ public class DataPageIO extends PageIO { int payloadSize, CacheDataRow row ) throws IgniteCheckedException { - try { - buf.position(dataOff); + PageUtils.putShort(buf, dataOff, (short)payloadSize); + dataOff += 2; - buf.putShort((short)payloadSize); + byte[] bytes = row.key().valueBytes(null); - boolean ok = row.key().putValue(buf); + PageUtils.putInt(buf, dataOff, bytes.length); + dataOff += 4; - assert ok; + PageUtils.putBytes(buf, dataOff, bytes); + dataOff += bytes.length; - ok = row.value().putValue(buf); + PageUtils.putByte(buf, dataOff, row.key().cacheObjectType()); + dataOff++; - assert ok; + bytes = row.value().valueBytes(null); - CacheVersionIO.write(buf, row.version(), false); + PageUtils.putInt(buf, dataOff, bytes.length); + dataOff += 4; - buf.putLong(row.expireTime()); - } - finally { - buf.position(0); - } + PageUtils.putBytes(buf, dataOff, bytes); + dataOff += bytes.length; + + PageUtils.putByte(buf, dataOff, row.value().cacheObjectType()); + dataOff++; + + CacheVersionIO.write(buf + dataOff, row.version(), false); + + dataOff += CacheVersionIO.size(row.version(), false); + + PageUtils.putLong(buf, dataOff, row.expireTime()); } /** @@ -1242,15 +1284,9 @@ public class DataPageIO extends PageIO { int dataOff, byte[] payload ) { - try { - buf.position(dataOff); + PageUtils.putShort(buf, dataOff, (short)payload.length); + dataOff += 2; - buf.putShort((short)payload.length); - - buf.put(payload); - } - finally { - buf.position(0); - } + PageUtils.putBytes(buf, dataOff, payload); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java new file mode 100644 index 0000000..203429e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPagePayload.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.database.tree.io; + +/** + * + */ +public class DataPagePayload { + /** */ + private final int off; + + /** */ + private final int payloadSize; + + /** */ + private final long nextLink; + + DataPagePayload(int off, int payloadSize, long nextLink) { + this.off = off; + this.payloadSize = payloadSize; + this.nextLink = nextLink; + } + + public int offset() { + return off; + } + + public int payloadSize() { + return payloadSize; + } + + public long nextLink() { + return nextLink; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java index 12646ae..3aee268 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java @@ -275,7 +275,7 @@ public abstract class PageIO { * @param buf Buffer. * @param pageId Page ID. */ - public void initNewPage(long buf, long pageId) { + public void initNewPage(long buf, long pageId, int pageSize) { setType(buf, getType()); setVersion(buf, getVersion()); setPageId(buf, pageId); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageMetaIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageMetaIO.java index fb8762e..7ef4db0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageMetaIO.java @@ -68,8 +68,8 @@ public class PageMetaIO extends PageIO { } /** {@inheritDoc} */ - @Override public void initNewPage(long buf, long pageId) { - super.initNewPage(buf, pageId); + @Override public void initNewPage(long buf, long pageId, int pageSize) { + super.initNewPage(buf, pageId, pageSize); setTreeRoot(buf, 0); setReuseListRoot(buf, 0); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1ed0da/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java index 72893fb..531ba1e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PagePartitionMetaIO.java @@ -42,8 +42,8 @@ public class PagePartitionMetaIO extends PageMetaIO { ); /** {@inheritDoc} */ - @Override public void initNewPage(long buf, long pageId) { - super.initNewPage(buf, pageId); + @Override public void initNewPage(long buf, long pageId, int pageSize) { + super.initNewPage(buf, pageId, pageSize); setSize(buf, 0); setUpdateCounter(buf, 0);