Repository: ignite
Updated Branches:
  refs/heads/ignite-direct-marsh-opt f33e64309 -> 393b630ef


Optimizations for direct marshalling


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/393b630e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/393b630e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/393b630e

Branch: refs/heads/ignite-direct-marsh-opt
Commit: 393b630ef976bf59844dfb00a2c9aa2e29ad7d6f
Parents: f33e643
Author: Valentin Kulichenko <valentin.kuliche...@gmail.com>
Authored: Mon Nov 16 22:04:57 2015 -0800
Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com>
Committed: Mon Nov 16 22:04:57 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java | 307 ++++++++++++++-----
 .../internal/direct/DirectMessageWriter.java    |   9 +-
 2 files changed, 234 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/393b630e/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index ccbfe09..e05e20d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -23,8 +23,9 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
+import java.util.RandomAccess;
 import java.util.UUID;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -256,7 +257,7 @@ public class DirectByteBufferStream {
     private Iterator<?> it;
 
     /** */
-    private Iterator<?> arrIt;
+    private int arrPos = -1;
 
     /** */
     private Object arrCur = NULL;
@@ -292,6 +293,18 @@ public class DirectByteBufferStream {
     private int primShift;
 
     /** */
+    private int uuidState;
+
+    /** */
+    private long uuidMost;
+
+    /** */
+    private long uuidLeast;
+
+    /** */
+    private long uuidLocId;
+
+    /** */
     private boolean lastFinished;
 
     /** */
@@ -503,7 +516,7 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeShortArray(short[] val) {
         if (val != null)
@@ -513,7 +526,7 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeIntArray(int[] val) {
         if (val != null)
@@ -523,7 +536,7 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeLongArray(long[] val) {
         if (val != null)
@@ -533,7 +546,7 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeFloatArray(float[] val) {
         if (val != null)
@@ -543,7 +556,7 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeDoubleArray(double[] val) {
         if (val != null)
@@ -553,7 +566,7 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeCharArray(char[] val) {
         if (val != null)
@@ -563,7 +576,7 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeBooleanArray(boolean[] val) {
         if (val != null)
@@ -573,31 +586,87 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeString(String val) {
         writeByteArray(val != null ? val.getBytes() : null);
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeBitSet(BitSet val) {
         writeLongArray(val != null ? val.toLongArray() : null);
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeUuid(UUID val) {
-        writeByteArray(val != null ? U.uuidToBytes(val) : null);
+        switch (uuidState) {
+            case 0:
+                writeBoolean(val == null);
+
+                if (!lastFinished || val == null)
+                    return;
+
+                uuidState++;
+
+            case 1:
+                writeLong(val.getMostSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 2:
+                writeLong(val.getLeastSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState = 0;
+        }
     }
 
     /**
-     * @param val Value
+     * @param val Value.
      */
     public void writeIgniteUuid(IgniteUuid val) {
-        writeByteArray(val != null ? U.igniteUuidToBytes(val) : null);
+        switch (uuidState) {
+            case 0:
+                writeBoolean(val == null);
+
+                if (!lastFinished || val == null)
+                    return;
+
+                uuidState++;
+
+            case 1:
+                writeLong(val.globalId().getMostSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 2:
+                writeLong(val.globalId().getLeastSignificantBits());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState++;
+
+            case 3:
+                writeLong(val.localId());
+
+                if (!lastFinished)
+                    return;
+
+                uuidState = 0;
+        }
     }
 
     /**
@@ -629,18 +698,20 @@ public class DirectByteBufferStream {
      */
     public <T> void writeObjectArray(T[] arr, MessageCollectionItemType 
itemType, MessageWriter writer) {
         if (arr != null) {
-            if (arrIt == null) {
-                writeInt(arr.length);
+            int len = arr.length;
+
+            if (arrPos == -1) {
+                writeInt(len);
 
                 if (!lastFinished)
                     return;
 
-                arrIt = arrayIterator(arr);
+                arrPos = 0;
             }
 
-            while (arrIt.hasNext() || arrCur != NULL) {
+            while (arrPos < len || arrCur != NULL) {
                 if (arrCur == NULL)
-                    arrCur = arrIt.next();
+                    arrCur = arr[arrPos++];
 
                 write(itemType, arrCur, writer);
 
@@ -650,7 +721,7 @@ public class DirectByteBufferStream {
                 arrCur = NULL;
             }
 
-            arrIt = null;
+            arrPos = -1;
         }
         else
             writeInt(-1);
@@ -691,6 +762,44 @@ public class DirectByteBufferStream {
     }
 
     /**
+     * @param list List.
+     * @param itemType Component type.
+     * @param writer Writer.
+     */
+    public <T> void writeRandomAccessList(List<T> list, 
MessageCollectionItemType itemType, MessageWriter writer) {
+        if (list != null) {
+            assert list instanceof RandomAccess;
+
+            int size = list.size();
+
+            if (arrPos == -1) {
+                writeInt(size);
+
+                if (!lastFinished)
+                    return;
+
+                arrPos = 0;
+            }
+
+            while (arrPos < size || arrCur != NULL) {
+                if (arrCur == NULL)
+                    arrCur = list.get(arrPos++);
+
+                write(itemType, arrCur, writer);
+
+                if (!lastFinished)
+                    return;
+
+                arrCur = NULL;
+            }
+
+            arrPos = -1;
+        }
+        else
+            writeInt(-1);
+    }
+
+    /**
      * @param map Map.
      * @param keyType Key type.
      * @param valType Value type.
@@ -783,20 +892,17 @@ public class DirectByteBufferStream {
 
         int val = 0;
 
-        int initPos = buf.position();
-        int shift = 0;
-
         while (buf.hasRemaining()) {
-            byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift);
+            int pos = buf.position();
 
-            prim |= ((long)b & 0x7F) << (7 * primShift);
+            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+
+            buf.position(pos + 1);
 
-            primShift++;
-            shift++;
+            prim |= ((long)b & 0x7F) << (7 * primShift);
 
             if ((b & 0x80) == 0) {
                 lastFinished = true;
-                primShift = 0;
 
                 val = (int)prim;
 
@@ -806,13 +912,14 @@ public class DirectByteBufferStream {
                     val--;
 
                 prim = 0;
+                primShift = 0;
 
                 break;
             }
+            else
+                primShift++;
         }
 
-        buf.position(initPos + shift);
-
         return val;
     }
 
@@ -824,20 +931,17 @@ public class DirectByteBufferStream {
 
         long val = 0;
 
-        int initPos = buf.position();
-        int shift = 0;
-
         while (buf.hasRemaining()) {
-            byte b = UNSAFE.getByte(heapArr, baseOff + initPos + shift);
+            int pos = buf.position();
 
-            prim |= ((long)b & 0x7F) << (7 * primShift);
+            byte b = UNSAFE.getByte(heapArr, baseOff + pos);
+
+            buf.position(pos + 1);
 
-            shift++;
-            primShift++;
+            prim |= ((long)b & 0x7F) << (7 * primShift);
 
             if ((b & 0x80) == 0) {
                 lastFinished = true;
-                primShift = 0;
 
                 val = prim;
 
@@ -847,13 +951,14 @@ public class DirectByteBufferStream {
                     val--;
 
                 prim = 0;
+                primShift = 0;
 
                 break;
             }
+            else
+                primShift++;
         }
 
-        buf.position(initPos + shift);
-
         return val;
     }
 
@@ -1004,18 +1109,84 @@ public class DirectByteBufferStream {
      * @return Value.
      */
     public UUID readUuid() {
-        byte[] arr = readByteArray();
+        switch (uuidState) {
+            case 0:
+                boolean isNull = readBoolean();
+
+                if (!lastFinished || isNull)
+                    return null;
+
+                uuidState++;
+
+            case 1:
+                uuidMost = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 2:
+                uuidLeast = readLong();
 
-        return arr != null ? U.bytesToUuid(arr, 0) : null;
+                if (!lastFinished)
+                    return null;
+
+                uuidState = 0;
+        }
+
+        UUID val = new UUID(uuidMost, uuidLeast);
+
+        uuidMost = 0;
+        uuidLeast = 0;
+
+        return val;
     }
 
     /**
      * @return Value.
      */
     public IgniteUuid readIgniteUuid() {
-        byte[] arr = readByteArray();
+        switch (uuidState) {
+            case 0:
+                boolean isNull = readBoolean();
 
-        return arr != null ? U.bytesToIgniteUuid(arr, 0) : null;
+                if (!lastFinished || isNull)
+                    return null;
+
+                uuidState++;
+
+            case 1:
+                uuidMost = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 2:
+                uuidLeast = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState++;
+
+            case 3:
+                uuidLocId = readLong();
+
+                if (!lastFinished)
+                    return null;
+
+                uuidState = 0;
+        }
+
+        IgniteUuid val = new IgniteUuid(new UUID(uuidMost, uuidLeast), 
uuidLocId);
+
+        uuidMost = 0;
+        uuidLeast = 0;
+
+        return val;
     }
 
     /**
@@ -1204,7 +1375,8 @@ public class DirectByteBufferStream {
      * @return Whether array was fully written.
      */
     private boolean writeArray(Object arr, long off, int len, int bytes) {
-        assert arr == null || (arr.getClass().isArray() && 
arr.getClass().getComponentType().isPrimitive());
+        assert arr != null;
+        assert arr.getClass().isArray() && 
arr.getClass().getComponentType().isPrimitive();
         assert off > 0;
         assert len >= 0;
         assert bytes >= 0;
@@ -1224,24 +1396,24 @@ public class DirectByteBufferStream {
         int remaining = buf.remaining();
 
         if (toWrite <= remaining) {
-            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, 
toWrite);
-
-            pos += toWrite;
+            if (toWrite > 0) {
+                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, 
toWrite);
 
-            buf.position(pos);
+                buf.position(pos + toWrite);
+            }
 
             arrOff = -1;
 
             return true;
         }
         else {
-            UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, 
remaining);
-
-            pos += remaining;
+            if (remaining > 0) {
+                UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, 
remaining);
 
-            buf.position(pos);
+                buf.position(pos + remaining);
 
-            arrOff += remaining;
+                arrOff += remaining;
+            }
 
             return false;
         }
@@ -1511,31 +1683,6 @@ public class DirectByteBufferStream {
     }
 
     /**
-     * @param arr Array.
-     * @return Array iterator.
-     */
-    private Iterator<?> arrayIterator(final Object[] arr) {
-        return new Iterator<Object>() {
-            private int idx;
-
-            @Override public boolean hasNext() {
-                return idx < arr.length;
-            }
-
-            @Override public Object next() {
-                if (!hasNext())
-                    throw new NoSuchElementException();
-
-                return arr[idx++];
-            }
-
-            @Override public void remove() {
-                throw new UnsupportedOperationException();
-            }
-        };
-    }
-
-    /**
      * Array creator.
      */
     private static interface ArrayCreator<T> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/393b630e/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index ea0f37e..8ad7042 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.direct;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.RandomAccess;
 import java.util.UUID;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -213,7 +215,10 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public <T> boolean writeCollection(String name, Collection<T> 
col, MessageCollectionItemType itemType) {
-        stream.writeCollection(col, itemType, this);
+        if (col instanceof List && col instanceof RandomAccess)
+            stream.writeRandomAccessList((List<T>)col, itemType, this);
+        else
+            stream.writeCollection(col, itemType, this);
 
         return stream.lastFinished();
     }
@@ -260,4 +265,4 @@ public class DirectMessageWriter implements MessageWriter {
     @Override public void reset() {
         state.reset();
     }
-}
\ No newline at end of file
+}

Reply via email to