Repository: ignite Updated Branches: refs/heads/ignite-direct-marsh-opt [created] 045bb6b57
Optimizations for direct marshalling Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/045bb6b5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/045bb6b5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/045bb6b5 Branch: refs/heads/ignite-direct-marsh-opt Commit: 045bb6b578f6943e4e673d6be8e79eb6beb0f889 Parents: 5a116cb Author: Valentin Kulichenko <valentin.kuliche...@gmail.com> Authored: Sun Nov 15 22:30:23 2015 -0800 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Sun Nov 15 22:33:59 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 154 +++++++++++-------- 1 file changed, 93 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/045bb6b5/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index cf56430..0201298 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -286,6 +286,9 @@ public class DirectByteBufferStream { private Map<Object, Object> map; /** */ + private long prim; + + /** */ private boolean lastFinished; /** */ @@ -362,14 +365,27 @@ public class DirectByteBufferStream { * @param val Value. */ public void writeInt(int val) { - lastFinished = buf.remaining() >= 4; + if (val == Integer.MAX_VALUE) + val = Integer.MIN_VALUE; + else + val++; + + lastFinished = buf.remaining() >= 5; if (lastFinished) { int pos = buf.position(); - UNSAFE.putInt(heapArr, baseOff + pos, val); + while ((val & 0xFFFFFF80) != 0) { + byte b = (byte)(val | 0x80); - buf.position(pos + 4); + UNSAFE.putByte(heapArr, baseOff + pos++, b); + + val >>>= 7; + } + + UNSAFE.putByte(heapArr, baseOff + pos++, (byte) val); + + buf.position(pos); } } @@ -377,14 +393,22 @@ public class DirectByteBufferStream { * @param val Value. */ public void writeLong(long val) { - lastFinished = buf.remaining() >= 8; + lastFinished = buf.remaining() >= 10; if (lastFinished) { int pos = buf.position(); - UNSAFE.putLong(heapArr, baseOff + pos, val); + while ((val & 0xFFFFFFFFFFFFFF80L) != 0) { + byte b = (byte)(val | 0x80); - buf.position(pos + 8); + UNSAFE.putByte(heapArr, baseOff + pos++, b); + + val >>>= 7; + } + + UNSAFE.putByte(heapArr, baseOff + pos++, (byte) val); + + buf.position(pos); } } @@ -747,34 +771,73 @@ public class DirectByteBufferStream { * @return Value. */ public int readInt() { - lastFinished = buf.remaining() >= 4; + lastFinished = false; - if (lastFinished) { - int pos = buf.position(); + int val = 0; - buf.position(pos + 4); + int initPos = buf.position(); + int shift = 0; + + while (buf.hasRemaining()) { + byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift); + + prim |= ((long)b & 0x7F) << (7 * shift); + + shift++; - return UNSAFE.getInt(heapArr, baseOff + pos); + if ((b & 0x80) == 0) { + lastFinished = true; + + val = (int)prim; + + if (val == Integer.MIN_VALUE) + val = Integer.MAX_VALUE; + else + val--; + + prim = 0; + + break; + } } - else - return 0; + + buf.position(initPos + shift); + + return val; } /** * @return Value. */ public long readLong() { - lastFinished = buf.remaining() >= 8; + lastFinished = false; - if (lastFinished) { - int pos = buf.position(); + long val = 0; - buf.position(pos + 8); + int initPos = buf.position(); + int shift = 0; - return UNSAFE.getLong(heapArr, baseOff + pos); + while (buf.hasRemaining()) { + byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift); + + prim |= ((long)b & 0x7F) << (7 * shift); + + shift++; + + if ((b & 0x80) == 0) { + lastFinished = true; + + val = prim; + + prim = 0; + + break; + } } - else - return 0; + + buf.position(initPos + shift); + + return val; } /** @@ -1121,35 +1184,20 @@ public class DirectByteBufferStream { * @param off Offset. * @param len Length. * @param bytes Length in bytes. - * @return Whether array was fully written + * @return Whether array was fully written. */ private boolean writeArray(Object arr, long off, int len, int bytes) { - return writeArray(arr, off, len, bytes, false); - } - - /** - * @param arr Array. - * @param off Offset. - * @param len Length. - * @param bytes Length in bytes. - * @param skipLen {@code true} if length should not be written. - * @return Whether array was fully written - */ - private boolean writeArray(Object arr, long off, int len, int bytes, boolean skipLen) { - assert arr != null; - assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive(); + assert arr == null || (arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive()); assert off > 0; assert len >= 0; assert bytes >= 0; assert bytes >= arrOff; if (arrOff == -1) { - if (!skipLen) { - if (buf.remaining() < 4) - return false; + writeInt(len); - writeInt(len); - } + if (!lastFinished) + return false; arrOff = 0; } @@ -1188,31 +1236,15 @@ public class DirectByteBufferStream { * @param off Base offset. * @return Array or special value if it was not fully read. */ - private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) { - return readArray(creator, lenShift, off, -1); - } - - /** - * @param creator Array creator. - * @param lenShift Array length shift size. - * @param off Base offset. - * @param len Length. - * @return Array or special value if it was not fully read. - */ @SuppressWarnings("unchecked") - private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off, int len) { + private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) { assert creator != null; if (tmpArr == null) { - if (len == -1) { - if (buf.remaining() < 4) { - lastFinished = false; + int len = readInt(); - return null; - } - - len = readInt(); - } + if (!lastFinished) + return null; switch (len) { case -1: @@ -1496,4 +1528,4 @@ public class DirectByteBufferStream { */ public T create(int len); } -} \ No newline at end of file +}