Reusing message reader per NIO session

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7123a73d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7123a73d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7123a73d

Branch: refs/heads/ignite-direct-marsh-opt
Commit: 7123a73d1e80d568588a3cf44ea3e513f484093c
Parents: 1f5a409
Author: Valentin Kulichenko <valentin.kuliche...@gmail.com>
Authored: Thu Nov 12 21:34:35 2015 -0800
Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com>
Committed: Thu Nov 12 21:34:35 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectByteBufferStream.java |  53 ++++----
 .../internal/direct/DirectMessageReader.java    |  92 ++++++++++---
 .../direct/DirectMessageReaderState.java        | 128 +++++++++++++++++++
 .../internal/direct/DirectMessageWriter.java    |   4 +-
 .../managers/communication/GridIoManager.java   |  45 ++++---
 .../internal/util/nio/GridDirectParser.java     |  27 ++--
 .../extensions/communication/MessageReader.java |  19 ++-
 .../testframework/GridSpiTestContext.java       |   4 +-
 8 files changed, 287 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
index cf56430..dc6e419 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java
@@ -32,7 +32,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import sun.misc.Unsafe;
@@ -220,9 +219,6 @@ public class DirectByteBufferStream {
     private final MessageFactory msgFactory;
 
     /** */
-    private final MessageFormatter msgFormatter;
-
-    /** */
     private ByteBuffer buf;
 
     /** */
@@ -288,16 +284,11 @@ public class DirectByteBufferStream {
     /** */
     private boolean lastFinished;
 
-    /** */
-    private MessageReader reader;
-
     /**
      * @param msgFactory Message factory.
-     * @param msgFormatter Message formatter.
      */
-    public DirectByteBufferStream(MessageFactory msgFactory, MessageFormatter 
msgFormatter) {
+    public DirectByteBufferStream(MessageFactory msgFactory) {
         this.msgFactory = msgFactory;
-        this.msgFormatter = msgFormatter;
     }
 
     /**
@@ -942,7 +933,7 @@ public class DirectByteBufferStream {
      * @return Message.
      */
     @SuppressWarnings("unchecked")
-    public <T extends Message> T readMessage() {
+    public <T extends Message> T readMessage(MessageReader reader) {
         if (!msgTypeDone) {
             if (!buf.hasRemaining()) {
                 lastFinished = false;
@@ -954,13 +945,21 @@ public class DirectByteBufferStream {
 
             msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type);
 
-            if (msg != null)
-                reader = msgFormatter.reader(msgFactory, msg.getClass());
-
             msgTypeDone = true;
         }
 
-        lastFinished = msg == null || msg.readFrom(buf, reader);
+        if (msg != null) {
+            try {
+                reader.beforeInnerMessageRead();
+
+                lastFinished = msg.readFrom(buf, reader);
+            }
+            finally {
+                reader.afterInnerMessageRead(lastFinished);
+            }
+        }
+        else
+            lastFinished = true;
 
         if (lastFinished) {
             Message msg0 = msg;
@@ -977,10 +976,11 @@ public class DirectByteBufferStream {
     /**
      * @param itemType Component type.
      * @param itemCls Component class.
+     * @param reader Reader.
      * @return Array.
      */
     @SuppressWarnings("unchecked")
-    public <T> T[] readObjectArray(MessageCollectionItemType itemType, 
Class<T> itemCls) {
+    public <T> T[] readObjectArray(MessageCollectionItemType itemType, 
Class<T> itemCls, MessageReader reader) {
         if (readSize == -1) {
             int size = readInt();
 
@@ -995,7 +995,7 @@ public class DirectByteBufferStream {
                 objArr = itemCls != null ? 
(Object[])Array.newInstance(itemCls, readSize) : new Object[readSize];
 
             for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType);
+                Object item = read(itemType, reader);
 
                 if (!lastFinished)
                     return null;
@@ -1019,10 +1019,11 @@ public class DirectByteBufferStream {
 
     /**
      * @param itemType Item type.
+     * @param reader Reader.
      * @return Collection.
      */
     @SuppressWarnings("unchecked")
-    public <C extends Collection<?>> C 
readCollection(MessageCollectionItemType itemType) {
+    public <C extends Collection<?>> C 
readCollection(MessageCollectionItemType itemType, MessageReader reader) {
         if (readSize == -1) {
             int size = readInt();
 
@@ -1037,7 +1038,7 @@ public class DirectByteBufferStream {
                 col = new ArrayList<>(readSize);
 
             for (int i = readItems; i < readSize; i++) {
-                Object item = read(itemType);
+                Object item = read(itemType, reader);
 
                 if (!lastFinished)
                     return null;
@@ -1063,11 +1064,12 @@ public class DirectByteBufferStream {
      * @param keyType Key type.
      * @param valType Value type.
      * @param linked Whether linked map should be created.
+     * @param reader Reader.
      * @return Map.
      */
     @SuppressWarnings("unchecked")
     public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, 
MessageCollectionItemType valType,
-        boolean linked) {
+        boolean linked, MessageReader reader) {
         if (readSize == -1) {
             int size = readInt();
 
@@ -1083,7 +1085,7 @@ public class DirectByteBufferStream {
 
             for (int i = readItems; i < readSize; i++) {
                 if (!keyDone) {
-                    Object key = read(keyType);
+                    Object key = read(keyType, reader);
 
                     if (!lastFinished)
                         return null;
@@ -1092,7 +1094,7 @@ public class DirectByteBufferStream {
                     keyDone = true;
                 }
 
-                Object val = read(valType);
+                Object val = read(valType, reader);
 
                 if (!lastFinished)
                     return null;
@@ -1389,9 +1391,10 @@ public class DirectByteBufferStream {
 
     /**
      * @param type Type.
+     * @param reader Reader.
      * @return Value.
      */
-    private Object read(MessageCollectionItemType type) {
+    private Object read(MessageCollectionItemType type, MessageReader reader) {
         switch (type) {
             case BYTE:
                 return readByte();
@@ -1454,7 +1457,7 @@ public class DirectByteBufferStream {
                 return readIgniteUuid();
 
             case MSG:
-                return readMessage();
+                return readMessage(reader);
 
             default:
                 throw new IllegalArgumentException("Unknown type: " + type);
@@ -1496,4 +1499,4 @@ public class DirectByteBufferStream {
          */
         public T create(int len);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 7eaab76..2f91fbd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -26,7 +26,6 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.jetbrains.annotations.Nullable;
 
@@ -34,26 +33,22 @@ import org.jetbrains.annotations.Nullable;
  * Message reader implementation.
  */
 public class DirectMessageReader implements MessageReader {
-    /** Stream. */
-    private final DirectByteBufferStream stream;
+    /** State. */
+    private final DirectMessageReaderState state;
 
     /** Whether last field was fully read. */
     private boolean lastRead;
 
-    /** Current state. */
-    private int state;
-
     /**
      * @param msgFactory Message factory.
-     * @param msgFormatter Message formatter.
      */
-    public DirectMessageReader(MessageFactory msgFactory, MessageFormatter 
msgFormatter) {
-        this.stream = new DirectByteBufferStream(msgFactory, msgFormatter);
+    public DirectMessageReader(MessageFactory msgFactory) {
+        state = new DirectMessageReaderState(msgFactory);
     }
 
     /** {@inheritDoc} */
     @Override public void setBuffer(ByteBuffer buf) {
-        stream.setBuffer(buf);
+        state.stream().setBuffer(buf);
     }
 
     /** {@inheritDoc} */
@@ -69,6 +64,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public byte readByte(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         byte val = stream.readByte();
 
         lastRead = stream.lastFinished();
@@ -78,6 +75,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public short readShort(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         short val = stream.readShort();
 
         lastRead = stream.lastFinished();
@@ -87,6 +86,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public int readInt(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         int val = stream.readInt();
 
         lastRead = stream.lastFinished();
@@ -96,6 +97,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public long readLong(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         long val = stream.readLong();
 
         lastRead = stream.lastFinished();
@@ -105,6 +108,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public float readFloat(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         float val = stream.readFloat();
 
         lastRead = stream.lastFinished();
@@ -114,6 +119,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public double readDouble(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         double val = stream.readDouble();
 
         lastRead = stream.lastFinished();
@@ -123,6 +130,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public char readChar(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         char val = stream.readChar();
 
         lastRead = stream.lastFinished();
@@ -132,6 +141,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public boolean readBoolean(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         boolean val = stream.readBoolean();
 
         lastRead = stream.lastFinished();
@@ -141,6 +152,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public byte[] readByteArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         byte[] arr = stream.readByteArray();
 
         lastRead = stream.lastFinished();
@@ -150,6 +163,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public short[] readShortArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         short[] arr = stream.readShortArray();
 
         lastRead = stream.lastFinished();
@@ -159,6 +174,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public int[] readIntArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         int[] arr = stream.readIntArray();
 
         lastRead = stream.lastFinished();
@@ -168,6 +185,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public long[] readLongArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         long[] arr = stream.readLongArray();
 
         lastRead = stream.lastFinished();
@@ -177,6 +196,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public float[] readFloatArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         float[] arr = stream.readFloatArray();
 
         lastRead = stream.lastFinished();
@@ -186,6 +207,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public double[] readDoubleArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         double[] arr = stream.readDoubleArray();
 
         lastRead = stream.lastFinished();
@@ -195,6 +218,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public char[] readCharArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         char[] arr = stream.readCharArray();
 
         lastRead = stream.lastFinished();
@@ -204,6 +229,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public boolean[] readBooleanArray(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         boolean[] arr = stream.readBooleanArray();
 
         lastRead = stream.lastFinished();
@@ -213,6 +240,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public String readString(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         String val = stream.readString();
 
         lastRead = stream.lastFinished();
@@ -222,6 +251,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public BitSet readBitSet(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         BitSet val = stream.readBitSet();
 
         lastRead = stream.lastFinished();
@@ -231,6 +262,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public UUID readUuid(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         UUID val = stream.readUuid();
 
         lastRead = stream.lastFinished();
@@ -240,6 +273,8 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public IgniteUuid readIgniteUuid(String name) {
+        DirectByteBufferStream stream = state.stream();
+
         IgniteUuid val = stream.readIgniteUuid();
 
         lastRead = stream.lastFinished();
@@ -249,7 +284,9 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Nullable @Override public <T extends Message> T readMessage(String name) {
-        T msg = stream.readMessage();
+        DirectByteBufferStream stream = state.stream();
+
+        T msg = stream.readMessage(this);
 
         lastRead = stream.lastFinished();
 
@@ -258,7 +295,9 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <T> T[] readObjectArray(String name, 
MessageCollectionItemType itemType, Class<T> itemCls) {
-        T[] msg = stream.readObjectArray(itemType, itemCls);
+        DirectByteBufferStream stream = state.stream();
+
+        T[] msg = stream.readObjectArray(itemType, itemCls, this);
 
         lastRead = stream.lastFinished();
 
@@ -267,7 +306,9 @@ public class DirectMessageReader implements MessageReader {
 
     /** {@inheritDoc} */
     @Override public <C extends Collection<?>> C readCollection(String name, 
MessageCollectionItemType itemType) {
-        C col = stream.readCollection(itemType);
+        DirectByteBufferStream stream = state.stream();
+
+        C col = stream.readCollection(itemType, this);
 
         lastRead = stream.lastFinished();
 
@@ -277,7 +318,9 @@ public class DirectMessageReader implements MessageReader {
     /** {@inheritDoc} */
     @Override public <M extends Map<?, ?>> M readMap(String name, 
MessageCollectionItemType keyType,
         MessageCollectionItemType valType, boolean linked) {
-        M map = stream.readMap(keyType, valType, linked);
+        DirectByteBufferStream stream = state.stream();
+
+        M map = stream.readMap(keyType, valType, linked, this);
 
         lastRead = stream.lastFinished();
 
@@ -291,11 +334,26 @@ public class DirectMessageReader implements MessageReader 
{
 
     /** {@inheritDoc} */
     @Override public int state() {
-        return state;
+        return state.state();
     }
 
     /** {@inheritDoc} */
     @Override public void incrementState() {
-        state++;
+        state.incrementState();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeInnerMessageRead() {
+        state.beforeInnerMessageRead();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterInnerMessageRead(boolean finished) {
+        state.afterInnerMessageRead(finished);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        state.reset();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
new file mode 100644
index 0000000..d423052
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.direct;
+
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+
+/**
+ * Writer state.
+ */
+public class DirectMessageReaderState {
+    /** Initial array size. */
+    private static final int INIT_SIZE = 10;
+
+    /** Message factory. */
+    private final MessageFactory msgFactory;
+
+    /** Stack array. */
+    private StateItem[] stack;
+
+    /** Current position. */
+    private int pos;
+
+    /**
+     * @param msgFactory Message factory.
+     */
+    public DirectMessageReaderState(MessageFactory msgFactory) {
+        this.msgFactory = msgFactory;
+
+        stack = new StateItem[INIT_SIZE];
+
+        stack[0] = new StateItem(msgFactory);
+    }
+
+    /**
+     * @return Current state.
+     */
+    public int state() {
+        return stack[pos].state;
+    }
+
+    /**
+     * Increments state.
+     */
+    public void incrementState() {
+        stack[pos].state++;
+    }
+
+    /**
+     * @return Current stream.
+     */
+    public DirectByteBufferStream stream() {
+        return stack[pos].stream;
+    }
+
+    /**
+     * Callback called before inner message is written.
+     */
+    public void beforeInnerMessageRead() {
+        pos++;
+
+        // Growing never happen for Ignite messages, but we need
+        // to support it for custom messages from plugins.
+        if (pos == stack.length) {
+            StateItem[] stack0 = stack;
+
+            stack = new StateItem[stack.length << 1];
+
+            System.arraycopy(stack0, 0, stack, 0, stack0.length);
+        }
+
+        if (stack[pos] == null)
+            stack[pos] = new StateItem(msgFactory);
+    }
+
+    /**
+     * Callback called after inner message is written.
+     *
+     * @param finished Whether message was fully written.
+     */
+    public void afterInnerMessageRead(boolean finished) {
+        if (finished)
+            stack[pos].state = 0;
+
+        pos--;
+    }
+
+    /**
+     * Resets state.
+     */
+    public void reset() {
+        assert pos == 0;
+
+        stack[0].state = 0;
+    }
+
+    /**
+     * State item.
+     */
+    private static class StateItem {
+        /** Stream. */
+        private final DirectByteBufferStream stream;
+
+        /** State. */
+        private int state;
+
+        /**
+         * @param msgFactory Message factory.
+         */
+        public StateItem(MessageFactory msgFactory) {
+            stream = new DirectByteBufferStream(msgFactory);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index ea0f37e..3132e3a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable;
  */
 public class DirectMessageWriter implements MessageWriter {
     /** Stream. */
-    private final DirectByteBufferStream stream = new 
DirectByteBufferStream(null, null);
+    private final DirectByteBufferStream stream = new 
DirectByteBufferStream(null);
 
     /** State. */
     private final DirectMessageWriterState state = new 
DirectMessageWriterState();
@@ -260,4 +260,4 @@ public class DirectMessageWriter implements MessageWriter {
     @Override public void reset() {
         state.reset();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index b8af8da..3d4fbef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -17,6 +17,26 @@
 
 package org.apache.ignite.internal.managers.communication;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -64,27 +84,6 @@ import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedDeque8;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -282,7 +281,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
                 }
 
                 @Override public MessageReader reader(MessageFactory factory, 
Class<? extends Message> msgCls) {
-                    return new DirectMessageReader(msgFactory, this);
+                    return new DirectMessageReader(msgFactory);
                 }
             };
         }
@@ -2432,4 +2431,4 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             return S.toString(DelayedMessage.class, this, super.toString());
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 987090d..5140cce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -58,29 +58,26 @@ public class GridDirectParser implements GridNioParser {
     /** {@inheritDoc} */
     @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer 
buf)
         throws IOException, IgniteCheckedException {
-        Message msg = ses.removeMeta(MSG_META_KEY);
+        MessageReader reader = ses.meta(READER_META_KEY);
 
-        MessageReader reader = null;
+        if (reader == null)
+            ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, 
null)); // TODO: class in null
 
-        if (msg == null && buf.hasRemaining()) {
-            msg = msgFactory.create(buf.get());
+        Message msg = ses.removeMeta(MSG_META_KEY);
 
-            ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, 
msg.getClass()));
-        }
+        if (msg == null && buf.hasRemaining())
+            msg = msgFactory.create(buf.get());
 
         boolean finished = false;
 
-        if (buf.hasRemaining()) {
-            if (reader == null)
-                reader = ses.meta(READER_META_KEY);
-
-            assert reader != null;
-
+        if (buf.hasRemaining())
             finished = msg.readFrom(buf, reader);
-        }
 
-        if (finished)
+        if (finished) {
+            reader.reset();
+
             return msg;
+        }
         else {
             ses.addMeta(MSG_META_KEY, msg);
 
@@ -93,4 +90,4 @@ public class GridDirectParser implements GridNioParser {
         // No encoding needed for direct messages.
         throw new UnsupportedEncodingException();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index d40a384..bfc67fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -272,4 +272,21 @@ public interface MessageReader {
      * Increments read state.
      */
     public void incrementState();
-}
\ No newline at end of file
+
+    /**
+     * Callback called before inner message is read.
+     */
+    public void beforeInnerMessageRead();
+
+    /**
+     * Callback called after inner message is read.
+     *
+     * @param finished Whether message was fully read.
+     */
+    public void afterInnerMessageRead(boolean finished);
+
+    /**
+     * Resets this reader.
+     */
+    public void reset();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 906a050..d9334eb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -497,7 +497,7 @@ public class GridSpiTestContext implements IgniteSpiContext 
{
                 }
 
                 @Override public MessageReader reader(MessageFactory factory, 
Class<? extends Message> msgCls) {
-                    return new DirectMessageReader(factory, this);
+                    return new DirectMessageReader(factory);
                 }
             };
         }
@@ -573,4 +573,4 @@ public class GridSpiTestContext implements IgniteSpiContext 
{
             this.obj = obj;
         }
     }
-}
\ No newline at end of file
+}

Reply via email to