Optimizations for direct marshalling

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d7747d81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d7747d81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d7747d81

Branch: refs/heads/ignite-direct-marsh-opt
Commit: d7747d816fc6d330759e0a1780188d4b2ef24a6d
Parents: 4cc5c9f
Author: Valentin Kulichenko <valentin.kuliche...@gmail.com>
Authored: Sun Nov 15 22:30:23 2015 -0800
Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com>
Committed: Sun Nov 15 22:30:23 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java | 152 +++++++++++--------
 1 file changed, 92 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d7747d81/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index dc6e419..ef9168c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -282,6 +282,9 @@ public class DirectByteBufferStream {
     private Map<Object, Object> map;
 
     /** */
+    private long prim;
+
+    /** */
     private boolean lastFinished;
 
     /**
@@ -353,14 +356,27 @@ public class DirectByteBufferStream {
      * @param val Value.
      */
     public void writeInt(int val) {
-        lastFinished = buf.remaining() >= 4;
+        if (val == Integer.MAX_VALUE)
+            val = Integer.MIN_VALUE;
+        else
+            val++;
+
+        lastFinished = buf.remaining() >= 5;
 
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putInt(heapArr, baseOff + pos, val);
+            while ((val & 0xFFFFFF80) != 0) {
+                byte b = (byte)(val | 0x80);
 
-            buf.position(pos + 4);
+                UNSAFE.putByte(heapArr, baseOff + pos++, b);
+
+                val >>>= 7;
+            }
+
+            UNSAFE.putByte(heapArr, baseOff + pos++, (byte) val);
+
+            buf.position(pos);
         }
     }
 
@@ -368,14 +384,22 @@ public class DirectByteBufferStream {
      * @param val Value.
      */
     public void writeLong(long val) {
-        lastFinished = buf.remaining() >= 8;
+        lastFinished = buf.remaining() >= 10;
 
         if (lastFinished) {
             int pos = buf.position();
 
-            UNSAFE.putLong(heapArr, baseOff + pos, val);
+            while ((val & 0xFFFFFFFFFFFFFF80L) != 0) {
+                byte b = (byte)(val | 0x80);
 
-            buf.position(pos + 8);
+                UNSAFE.putByte(heapArr, baseOff + pos++, b);
+
+                val >>>= 7;
+            }
+
+            UNSAFE.putByte(heapArr, baseOff + pos++, (byte) val);
+
+            buf.position(pos);
         }
     }
 
@@ -738,34 +762,73 @@ public class DirectByteBufferStream {
      * @return Value.
      */
     public int readInt() {
-        lastFinished = buf.remaining() >= 4;
+        lastFinished = false;
 
-        if (lastFinished) {
-            int pos = buf.position();
+        int val = 0;
 
-            buf.position(pos + 4);
+        int initPos = buf.position();
+        int shift = 0;
+
+        while (buf.hasRemaining()) {
+            byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift);
+
+            prim |= ((long)b & 0x7F) << (7 * shift);
+
+            shift++;
 
-            return UNSAFE.getInt(heapArr, baseOff + pos);
+            if ((b & 0x80) == 0) {
+                lastFinished = true;
+
+                val = (int)prim;
+
+                if (val == Integer.MIN_VALUE)
+                    val = Integer.MAX_VALUE;
+                else
+                    val--;
+
+                prim = 0;
+
+                break;
+            }
         }
-        else
-            return 0;
+
+        buf.position(initPos + shift);
+
+        return val;
     }
 
     /**
      * @return Value.
      */
     public long readLong() {
-        lastFinished = buf.remaining() >= 8;
+        lastFinished = false;
 
-        if (lastFinished) {
-            int pos = buf.position();
+        long val = 0;
 
-            buf.position(pos + 8);
+        int initPos = buf.position();
+        int shift = 0;
 
-            return UNSAFE.getLong(heapArr, baseOff + pos);
+        while (buf.hasRemaining()) {
+            byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift);
+
+            prim |= ((long)b & 0x7F) << (7 * shift);
+
+            shift++;
+
+            if ((b & 0x80) == 0) {
+                lastFinished = true;
+
+                val = prim;
+
+                prim = 0;
+
+                break;
+            }
         }
-        else
-            return 0;
+
+        buf.position(initPos + shift);
+
+        return val;
     }
 
     /**
@@ -1123,35 +1186,20 @@ public class DirectByteBufferStream {
      * @param off Offset.
      * @param len Length.
      * @param bytes Length in bytes.
-     * @return Whether array was fully written
+     * @return Whether array was fully written.
      */
     private boolean writeArray(Object arr, long off, int len, int bytes) {
-        return writeArray(arr, off, len, bytes, false);
-    }
-
-    /**
-     * @param arr Array.
-     * @param off Offset.
-     * @param len Length.
-     * @param bytes Length in bytes.
-     * @param skipLen {@code true} if length should not be written.
-     * @return Whether array was fully written
-     */
-    private boolean writeArray(Object arr, long off, int len, int bytes, 
boolean skipLen) {
-        assert arr != null;
-        assert arr.getClass().isArray() && 
arr.getClass().getComponentType().isPrimitive();
+        assert arr == null || (arr.getClass().isArray() && 
arr.getClass().getComponentType().isPrimitive());
         assert off > 0;
         assert len >= 0;
         assert bytes >= 0;
         assert bytes >= arrOff;
 
         if (arrOff == -1) {
-            if (!skipLen) {
-                if (buf.remaining() < 4)
-                    return false;
+            writeInt(len);
 
-                writeInt(len);
-            }
+            if (!lastFinished)
+                return false;
 
             arrOff = 0;
         }
@@ -1190,31 +1238,15 @@ public class DirectByteBufferStream {
      * @param off Base offset.
      * @return Array or special value if it was not fully read.
      */
-    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
-        return readArray(creator, lenShift, off, -1);
-    }
-
-    /**
-     * @param creator Array creator.
-     * @param lenShift Array length shift size.
-     * @param off Base offset.
-     * @param len Length.
-     * @return Array or special value if it was not fully read.
-     */
     @SuppressWarnings("unchecked")
-    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off, 
int len) {
+    private <T> T readArray(ArrayCreator<T> creator, int lenShift, long off) {
         assert creator != null;
 
         if (tmpArr == null) {
-            if (len == -1) {
-                if (buf.remaining() < 4) {
-                    lastFinished = false;
+            int len = readInt();
 
-                    return null;
-                }
-
-                len = readInt();
-            }
+            if (!lastFinished)
+                return null;
 
             switch (len) {
                 case -1:

Reply via email to