Reusing message reader per NIO session
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7123a73d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7123a73d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7123a73d Branch: refs/heads/ignite-direct-marsh-opt Commit: 7123a73d1e80d568588a3cf44ea3e513f484093c Parents: 1f5a409 Author: Valentin Kulichenko <valentin.kuliche...@gmail.com> Authored: Thu Nov 12 21:34:35 2015 -0800 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Thu Nov 12 21:34:35 2015 -0800 ---------------------------------------------------------------------- .../internal/direct/DirectByteBufferStream.java | 53 ++++---- .../internal/direct/DirectMessageReader.java | 92 ++++++++++--- .../direct/DirectMessageReaderState.java | 128 +++++++++++++++++++ .../internal/direct/DirectMessageWriter.java | 4 +- .../managers/communication/GridIoManager.java | 45 ++++--- .../internal/util/nio/GridDirectParser.java | 27 ++-- .../extensions/communication/MessageReader.java | 19 ++- .../testframework/GridSpiTestContext.java | 4 +- 8 files changed, 287 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java index cf56430..dc6e419 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectByteBufferStream.java @@ -32,7 +32,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFormatter; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import sun.misc.Unsafe; @@ -220,9 +219,6 @@ public class DirectByteBufferStream { private final MessageFactory msgFactory; /** */ - private final MessageFormatter msgFormatter; - - /** */ private ByteBuffer buf; /** */ @@ -288,16 +284,11 @@ public class DirectByteBufferStream { /** */ private boolean lastFinished; - /** */ - private MessageReader reader; - /** * @param msgFactory Message factory. - * @param msgFormatter Message formatter. */ - public DirectByteBufferStream(MessageFactory msgFactory, MessageFormatter msgFormatter) { + public DirectByteBufferStream(MessageFactory msgFactory) { this.msgFactory = msgFactory; - this.msgFormatter = msgFormatter; } /** @@ -942,7 +933,7 @@ public class DirectByteBufferStream { * @return Message. */ @SuppressWarnings("unchecked") - public <T extends Message> T readMessage() { + public <T extends Message> T readMessage(MessageReader reader) { if (!msgTypeDone) { if (!buf.hasRemaining()) { lastFinished = false; @@ -954,13 +945,21 @@ public class DirectByteBufferStream { msg = type == Byte.MIN_VALUE ? null : msgFactory.create(type); - if (msg != null) - reader = msgFormatter.reader(msgFactory, msg.getClass()); - msgTypeDone = true; } - lastFinished = msg == null || msg.readFrom(buf, reader); + if (msg != null) { + try { + reader.beforeInnerMessageRead(); + + lastFinished = msg.readFrom(buf, reader); + } + finally { + reader.afterInnerMessageRead(lastFinished); + } + } + else + lastFinished = true; if (lastFinished) { Message msg0 = msg; @@ -977,10 +976,11 @@ public class DirectByteBufferStream { /** * @param itemType Component type. * @param itemCls Component class. + * @param reader Reader. * @return Array. */ @SuppressWarnings("unchecked") - public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls) { + public <T> T[] readObjectArray(MessageCollectionItemType itemType, Class<T> itemCls, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -995,7 +995,7 @@ public class DirectByteBufferStream { objArr = itemCls != null ? (Object[])Array.newInstance(itemCls, readSize) : new Object[readSize]; for (int i = readItems; i < readSize; i++) { - Object item = read(itemType); + Object item = read(itemType, reader); if (!lastFinished) return null; @@ -1019,10 +1019,11 @@ public class DirectByteBufferStream { /** * @param itemType Item type. + * @param reader Reader. * @return Collection. */ @SuppressWarnings("unchecked") - public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType) { + public <C extends Collection<?>> C readCollection(MessageCollectionItemType itemType, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1037,7 +1038,7 @@ public class DirectByteBufferStream { col = new ArrayList<>(readSize); for (int i = readItems; i < readSize; i++) { - Object item = read(itemType); + Object item = read(itemType, reader); if (!lastFinished) return null; @@ -1063,11 +1064,12 @@ public class DirectByteBufferStream { * @param keyType Key type. * @param valType Value type. * @param linked Whether linked map should be created. + * @param reader Reader. * @return Map. */ @SuppressWarnings("unchecked") public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType, MessageCollectionItemType valType, - boolean linked) { + boolean linked, MessageReader reader) { if (readSize == -1) { int size = readInt(); @@ -1083,7 +1085,7 @@ public class DirectByteBufferStream { for (int i = readItems; i < readSize; i++) { if (!keyDone) { - Object key = read(keyType); + Object key = read(keyType, reader); if (!lastFinished) return null; @@ -1092,7 +1094,7 @@ public class DirectByteBufferStream { keyDone = true; } - Object val = read(valType); + Object val = read(valType, reader); if (!lastFinished) return null; @@ -1389,9 +1391,10 @@ public class DirectByteBufferStream { /** * @param type Type. + * @param reader Reader. * @return Value. */ - private Object read(MessageCollectionItemType type) { + private Object read(MessageCollectionItemType type, MessageReader reader) { switch (type) { case BYTE: return readByte(); @@ -1454,7 +1457,7 @@ public class DirectByteBufferStream { return readIgniteUuid(); case MSG: - return readMessage(); + return readMessage(reader); default: throw new IllegalArgumentException("Unknown type: " + type); @@ -1496,4 +1499,4 @@ public class DirectByteBufferStream { */ public T create(int len); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java index 7eaab76..2f91fbd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java @@ -26,7 +26,6 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFormatter; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.jetbrains.annotations.Nullable; @@ -34,26 +33,22 @@ import org.jetbrains.annotations.Nullable; * Message reader implementation. */ public class DirectMessageReader implements MessageReader { - /** Stream. */ - private final DirectByteBufferStream stream; + /** State. */ + private final DirectMessageReaderState state; /** Whether last field was fully read. */ private boolean lastRead; - /** Current state. */ - private int state; - /** * @param msgFactory Message factory. - * @param msgFormatter Message formatter. */ - public DirectMessageReader(MessageFactory msgFactory, MessageFormatter msgFormatter) { - this.stream = new DirectByteBufferStream(msgFactory, msgFormatter); + public DirectMessageReader(MessageFactory msgFactory) { + state = new DirectMessageReaderState(msgFactory); } /** {@inheritDoc} */ @Override public void setBuffer(ByteBuffer buf) { - stream.setBuffer(buf); + state.stream().setBuffer(buf); } /** {@inheritDoc} */ @@ -69,6 +64,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public byte readByte(String name) { + DirectByteBufferStream stream = state.stream(); + byte val = stream.readByte(); lastRead = stream.lastFinished(); @@ -78,6 +75,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public short readShort(String name) { + DirectByteBufferStream stream = state.stream(); + short val = stream.readShort(); lastRead = stream.lastFinished(); @@ -87,6 +86,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public int readInt(String name) { + DirectByteBufferStream stream = state.stream(); + int val = stream.readInt(); lastRead = stream.lastFinished(); @@ -96,6 +97,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public long readLong(String name) { + DirectByteBufferStream stream = state.stream(); + long val = stream.readLong(); lastRead = stream.lastFinished(); @@ -105,6 +108,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public float readFloat(String name) { + DirectByteBufferStream stream = state.stream(); + float val = stream.readFloat(); lastRead = stream.lastFinished(); @@ -114,6 +119,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public double readDouble(String name) { + DirectByteBufferStream stream = state.stream(); + double val = stream.readDouble(); lastRead = stream.lastFinished(); @@ -123,6 +130,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public char readChar(String name) { + DirectByteBufferStream stream = state.stream(); + char val = stream.readChar(); lastRead = stream.lastFinished(); @@ -132,6 +141,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public boolean readBoolean(String name) { + DirectByteBufferStream stream = state.stream(); + boolean val = stream.readBoolean(); lastRead = stream.lastFinished(); @@ -141,6 +152,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public byte[] readByteArray(String name) { + DirectByteBufferStream stream = state.stream(); + byte[] arr = stream.readByteArray(); lastRead = stream.lastFinished(); @@ -150,6 +163,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public short[] readShortArray(String name) { + DirectByteBufferStream stream = state.stream(); + short[] arr = stream.readShortArray(); lastRead = stream.lastFinished(); @@ -159,6 +174,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public int[] readIntArray(String name) { + DirectByteBufferStream stream = state.stream(); + int[] arr = stream.readIntArray(); lastRead = stream.lastFinished(); @@ -168,6 +185,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public long[] readLongArray(String name) { + DirectByteBufferStream stream = state.stream(); + long[] arr = stream.readLongArray(); lastRead = stream.lastFinished(); @@ -177,6 +196,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public float[] readFloatArray(String name) { + DirectByteBufferStream stream = state.stream(); + float[] arr = stream.readFloatArray(); lastRead = stream.lastFinished(); @@ -186,6 +207,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public double[] readDoubleArray(String name) { + DirectByteBufferStream stream = state.stream(); + double[] arr = stream.readDoubleArray(); lastRead = stream.lastFinished(); @@ -195,6 +218,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public char[] readCharArray(String name) { + DirectByteBufferStream stream = state.stream(); + char[] arr = stream.readCharArray(); lastRead = stream.lastFinished(); @@ -204,6 +229,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public boolean[] readBooleanArray(String name) { + DirectByteBufferStream stream = state.stream(); + boolean[] arr = stream.readBooleanArray(); lastRead = stream.lastFinished(); @@ -213,6 +240,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public String readString(String name) { + DirectByteBufferStream stream = state.stream(); + String val = stream.readString(); lastRead = stream.lastFinished(); @@ -222,6 +251,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public BitSet readBitSet(String name) { + DirectByteBufferStream stream = state.stream(); + BitSet val = stream.readBitSet(); lastRead = stream.lastFinished(); @@ -231,6 +262,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public UUID readUuid(String name) { + DirectByteBufferStream stream = state.stream(); + UUID val = stream.readUuid(); lastRead = stream.lastFinished(); @@ -240,6 +273,8 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public IgniteUuid readIgniteUuid(String name) { + DirectByteBufferStream stream = state.stream(); + IgniteUuid val = stream.readIgniteUuid(); lastRead = stream.lastFinished(); @@ -249,7 +284,9 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Nullable @Override public <T extends Message> T readMessage(String name) { - T msg = stream.readMessage(); + DirectByteBufferStream stream = state.stream(); + + T msg = stream.readMessage(this); lastRead = stream.lastFinished(); @@ -258,7 +295,9 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <T> T[] readObjectArray(String name, MessageCollectionItemType itemType, Class<T> itemCls) { - T[] msg = stream.readObjectArray(itemType, itemCls); + DirectByteBufferStream stream = state.stream(); + + T[] msg = stream.readObjectArray(itemType, itemCls, this); lastRead = stream.lastFinished(); @@ -267,7 +306,9 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <C extends Collection<?>> C readCollection(String name, MessageCollectionItemType itemType) { - C col = stream.readCollection(itemType); + DirectByteBufferStream stream = state.stream(); + + C col = stream.readCollection(itemType, this); lastRead = stream.lastFinished(); @@ -277,7 +318,9 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public <M extends Map<?, ?>> M readMap(String name, MessageCollectionItemType keyType, MessageCollectionItemType valType, boolean linked) { - M map = stream.readMap(keyType, valType, linked); + DirectByteBufferStream stream = state.stream(); + + M map = stream.readMap(keyType, valType, linked, this); lastRead = stream.lastFinished(); @@ -291,11 +334,26 @@ public class DirectMessageReader implements MessageReader { /** {@inheritDoc} */ @Override public int state() { - return state; + return state.state(); } /** {@inheritDoc} */ @Override public void incrementState() { - state++; + state.incrementState(); + } + + /** {@inheritDoc} */ + @Override public void beforeInnerMessageRead() { + state.beforeInnerMessageRead(); + } + + /** {@inheritDoc} */ + @Override public void afterInnerMessageRead(boolean finished) { + state.afterInnerMessageRead(finished); + } + + /** {@inheritDoc} */ + @Override public void reset() { + state.reset(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java new file mode 100644 index 0000000..d423052 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReaderState.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.direct; + +import org.apache.ignite.plugin.extensions.communication.MessageFactory; + +/** + * Writer state. + */ +public class DirectMessageReaderState { + /** Initial array size. */ + private static final int INIT_SIZE = 10; + + /** Message factory. */ + private final MessageFactory msgFactory; + + /** Stack array. */ + private StateItem[] stack; + + /** Current position. */ + private int pos; + + /** + * @param msgFactory Message factory. + */ + public DirectMessageReaderState(MessageFactory msgFactory) { + this.msgFactory = msgFactory; + + stack = new StateItem[INIT_SIZE]; + + stack[0] = new StateItem(msgFactory); + } + + /** + * @return Current state. + */ + public int state() { + return stack[pos].state; + } + + /** + * Increments state. + */ + public void incrementState() { + stack[pos].state++; + } + + /** + * @return Current stream. + */ + public DirectByteBufferStream stream() { + return stack[pos].stream; + } + + /** + * Callback called before inner message is written. + */ + public void beforeInnerMessageRead() { + pos++; + + // Growing never happen for Ignite messages, but we need + // to support it for custom messages from plugins. + if (pos == stack.length) { + StateItem[] stack0 = stack; + + stack = new StateItem[stack.length << 1]; + + System.arraycopy(stack0, 0, stack, 0, stack0.length); + } + + if (stack[pos] == null) + stack[pos] = new StateItem(msgFactory); + } + + /** + * Callback called after inner message is written. + * + * @param finished Whether message was fully written. + */ + public void afterInnerMessageRead(boolean finished) { + if (finished) + stack[pos].state = 0; + + pos--; + } + + /** + * Resets state. + */ + public void reset() { + assert pos == 0; + + stack[0].state = 0; + } + + /** + * State item. + */ + private static class StateItem { + /** Stream. */ + private final DirectByteBufferStream stream; + + /** State. */ + private int state; + + /** + * @param msgFactory Message factory. + */ + public StateItem(MessageFactory msgFactory) { + stream = new DirectByteBufferStream(msgFactory); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java index ea0f37e..3132e3a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java @@ -33,7 +33,7 @@ import org.jetbrains.annotations.Nullable; */ public class DirectMessageWriter implements MessageWriter { /** Stream. */ - private final DirectByteBufferStream stream = new DirectByteBufferStream(null, null); + private final DirectByteBufferStream stream = new DirectByteBufferStream(null); /** State. */ private final DirectMessageWriterState state = new DirectMessageWriterState(); @@ -260,4 +260,4 @@ public class DirectMessageWriter implements MessageWriter { @Override public void reset() { state.reset(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index b8af8da..3d4fbef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -17,6 +17,26 @@ package org.apache.ignite.internal.managers.communication; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -64,27 +84,6 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -282,7 +281,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - return new DirectMessageReader(msgFactory, this); + return new DirectMessageReader(msgFactory); } }; } @@ -2432,4 +2431,4 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return S.toString(DelayedMessage.class, this, super.toString()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 987090d..5140cce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -58,29 +58,26 @@ public class GridDirectParser implements GridNioParser { /** {@inheritDoc} */ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { - Message msg = ses.removeMeta(MSG_META_KEY); + MessageReader reader = ses.meta(READER_META_KEY); - MessageReader reader = null; + if (reader == null) + ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, null)); // TODO: class in null - if (msg == null && buf.hasRemaining()) { - msg = msgFactory.create(buf.get()); + Message msg = ses.removeMeta(MSG_META_KEY); - ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, msg.getClass())); - } + if (msg == null && buf.hasRemaining()) + msg = msgFactory.create(buf.get()); boolean finished = false; - if (buf.hasRemaining()) { - if (reader == null) - reader = ses.meta(READER_META_KEY); - - assert reader != null; - + if (buf.hasRemaining()) finished = msg.readFrom(buf, reader); - } - if (finished) + if (finished) { + reader.reset(); + return msg; + } else { ses.addMeta(MSG_META_KEY, msg); @@ -93,4 +90,4 @@ public class GridDirectParser implements GridNioParser { // No encoding needed for direct messages. throw new UnsupportedEncodingException(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index d40a384..bfc67fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -272,4 +272,21 @@ public interface MessageReader { * Increments read state. */ public void incrementState(); -} \ No newline at end of file + + /** + * Callback called before inner message is read. + */ + public void beforeInnerMessageRead(); + + /** + * Callback called after inner message is read. + * + * @param finished Whether message was fully read. + */ + public void afterInnerMessageRead(boolean finished); + + /** + * Resets this reader. + */ + public void reset(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7123a73d/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 906a050..d9334eb 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -497,7 +497,7 @@ public class GridSpiTestContext implements IgniteSpiContext { } @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - return new DirectMessageReader(factory, this); + return new DirectMessageReader(factory); } }; } @@ -573,4 +573,4 @@ public class GridSpiTestContext implements IgniteSpiContext { this.obj = obj; } } -} \ No newline at end of file +}