http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java index 987090d..aa88808 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; -import org.apache.ignite.plugin.extensions.communication.MessageFormatter; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.jetbrains.annotations.Nullable; @@ -41,46 +40,48 @@ public class GridDirectParser implements GridNioParser { private final MessageFactory msgFactory; /** */ - private final MessageFormatter formatter; + private final GridNioMessageReaderFactory readerFactory; /** * @param msgFactory Message factory. - * @param formatter Formatter. + * @param readerFactory Message reader factory. */ - public GridDirectParser(MessageFactory msgFactory, MessageFormatter formatter) { + public GridDirectParser(MessageFactory msgFactory, GridNioMessageReaderFactory readerFactory) { assert msgFactory != null; - assert formatter != null; + assert readerFactory != null; this.msgFactory = msgFactory; - this.formatter = formatter; + this.readerFactory = readerFactory; } /** {@inheritDoc} */ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { - Message msg = ses.removeMeta(MSG_META_KEY); + MessageReader reader = ses.meta(READER_META_KEY); - MessageReader reader = null; + if (reader == null) + ses.addMeta(READER_META_KEY, reader = readerFactory.reader(ses, msgFactory)); - if (msg == null && buf.hasRemaining()) { - msg = msgFactory.create(buf.get()); + Message msg = ses.removeMeta(MSG_META_KEY); - ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, msg.getClass())); - } + if (msg == null && buf.hasRemaining()) + msg = msgFactory.create(buf.get()); boolean finished = false; if (buf.hasRemaining()) { - if (reader == null) - reader = ses.meta(READER_META_KEY); - - assert reader != null; + if (reader != null) + reader.setCurrentReadClass(msg.getClass()); finished = msg.readFrom(buf, reader); } - if (finished) + if (finished) { + if (reader != null) + reader.reset(); + return msg; + } else { ses.addMeta(MSG_META_KEY, msg); @@ -93,4 +94,4 @@ public class GridDirectParser implements GridNioParser { // No encoding needed for direct messages. throw new UnsupportedEncodingException(); } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java new file mode 100644 index 0000000..cb7e62d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageReader; + +/** + * Message reader factory. + */ +public interface GridNioMessageReaderFactory { + /** + * Creates new reader. + * + * @param ses Current session. + * @param msgFactory Message factory. + * @return Reader. + * @throws IgniteCheckedException In case of error. + */ + public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java new file mode 100644 index 0000000..6c4dee4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Message writer factory. + */ +public interface GridNioMessageWriterFactory { + /** + * Creates new writer. + * + * @param ses Current session. + * @return Writer. + * @throws IgniteCheckedException In case of error. + */ + public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 5bd08e7..384ee1a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -62,7 +62,6 @@ import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageFormatter; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -176,7 +175,7 @@ public class GridNioServer<T> { /** */ @GridToStringExclude - private MessageFormatter formatter; + private GridNioMessageWriterFactory writerFactory; /** */ @GridToStringExclude @@ -212,7 +211,7 @@ public class GridNioServer<T> { * @param directMode Whether direct mode is used. * @param daemon Daemon flag to create threads. * @param metricsLsnr Metrics listener. - * @param formatter Message formatter. + * @param writerFactory Writer factory. * @param skipRecoveryPred Skip recovery predicate. * @param msgQueueLsnr Message queue size listener. * @param filters Filters for this server. @@ -234,7 +233,7 @@ public class GridNioServer<T> { boolean directMode, boolean daemon, GridNioMetricsListener metricsLsnr, - MessageFormatter formatter, + GridNioMessageWriterFactory writerFactory, IgnitePredicate<Message> skipRecoveryPred, IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr, GridNioFilter... filters @@ -304,7 +303,7 @@ public class GridNioServer<T> { this.directMode = directMode; this.metricsLsnr = metricsLsnr; - this.formatter = formatter; + this.writerFactory = writerFactory; this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse(); } @@ -934,8 +933,14 @@ public class GridNioServer<T> { MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - if (writer == null) - ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer()); + if (writer == null) { + try { + ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses)); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to create message writer.", e); + } + } boolean handshakeFinished = sslFilter.lock(ses); @@ -1129,8 +1134,14 @@ public class GridNioServer<T> { MessageWriter writer = ses.meta(MSG_WRITER.ordinal()); - if (writer == null) - ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer()); + if (writer == null) { + try { + ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses)); + } + catch (IgniteCheckedException e) { + throw new IOException("Failed to create message writer.", e); + } + } if (req == null) { req = (NioOperationFuture<?>)ses.pollFuture(); @@ -1152,7 +1163,7 @@ public class GridNioServer<T> { finished = msg.writeTo(buf, writer); - if (finished) + if (finished && writer != null) writer.reset(); } @@ -1176,7 +1187,7 @@ public class GridNioServer<T> { finished = msg.writeTo(buf, writer); - if (finished) + if (finished && writer != null) writer.reset(); } @@ -2221,8 +2232,8 @@ public class GridNioServer<T> { /** Daemon flag. */ private boolean daemon; - /** Message formatter. */ - private MessageFormatter formatter; + /** Writer factory. */ + private GridNioMessageWriterFactory writerFactory; /** Skip recovery predicate. */ private IgnitePredicate<Message> skipRecoveryPred; @@ -2253,7 +2264,7 @@ public class GridNioServer<T> { directMode, daemon, metricsLsnr, - formatter, + writerFactory, skipRecoveryPred, msgQueueLsnr, filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS @@ -2450,11 +2461,11 @@ public class GridNioServer<T> { } /** - * @param formatter Message formatter. + * @param writerFactory Writer factory. * @return This for chaining. */ - public Builder<T> messageFormatter(MessageFormatter formatter) { - this.formatter = formatter; + public Builder<T> writerFactory(GridNioMessageWriterFactory writerFactory) { + this.writerFactory = writerFactory; return this; } @@ -2479,4 +2490,4 @@ public class GridNioServer<T> { return this; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java index 93e789d..ebe86fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFormatter; -import org.jetbrains.annotations.Nullable; /** * @@ -120,16 +119,17 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien } /** {@inheritDoc} */ - @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg, - IgniteInClosure<IgniteException> closure) - throws IgniteCheckedException { + @Override public synchronized boolean sendMessage(UUID nodeId, Message msg, + IgniteInClosure<IgniteException> closure) throws IgniteCheckedException { + assert nodeId != null; + if (closed()) throw new IgniteCheckedException("Communication client was closed: " + this); assert writeBuf.hasArray(); try { - int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer()); + int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer(nodeId)); metricsLsnr.onBytesSent(cnt); } @@ -154,4 +154,4 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien @Override public String toString() { return S.toString(GridShmemCommunicationClient.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java index 4c9e0ee..7bc2a53 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java @@ -17,6 +17,8 @@ package org.apache.ignite.plugin.extensions.communication; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.plugin.Extension; /** @@ -33,16 +35,19 @@ public interface MessageFormatter extends Extension { /** * Creates new message writer instance. * + * @param rmtNodeId Remote node ID. * @return Message writer. + * @throws IgniteCheckedException In case of error. */ - public MessageWriter writer(); + public MessageWriter writer(UUID rmtNodeId) throws IgniteCheckedException; /** * Creates new message reader instance. * - * @param factory Message factory. - * @param msgCls Message class to read. + * @param rmtNodeId Remote node ID. + * @param msgFactory Message factory. * @return Message reader. + * @throws IgniteCheckedException In case of error. */ - public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls); -} \ No newline at end of file + public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java index d40a384..502c69f 100644 --- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java +++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java @@ -39,6 +39,13 @@ public interface MessageReader { public void setBuffer(ByteBuffer buf); /** + * Sets type of message currently read. + * + * @param msgCls Message type. + */ + public void setCurrentReadClass(Class<? extends Message> msgCls); + + /** * Callback that must be invoked by a message implementation before message body started decoding. * * @return {@code True} if reading can proceed, {@code false} otherwise. @@ -272,4 +279,21 @@ public interface MessageReader { * Increments read state. */ public void incrementState(); -} \ No newline at end of file + + /** + * Callback called before inner message is read. + */ + public void beforeInnerMessageRead(); + + /** + * Callback called after inner message is read. + * + * @param finished Whether message was fully read. + */ + public void afterInnerMessageRead(boolean finished); + + /** + * Resets this reader. + */ + public void reset(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index cd46de4..1cb202c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -696,11 +696,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement if (msgFormatter0 == null) { msgFormatter0 = new MessageFormatter() { - @Override public MessageWriter writer() { + @Override public MessageWriter writer(UUID rmtNodeId) { throw new IgniteException("Failed to write message, node is not started."); } - @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { + @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) { throw new IgniteException("Failed to read message, node is not started."); } }; @@ -880,4 +880,4 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj)); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index e8bd8a1..68e2f43 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -76,7 +76,9 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter; import org.apache.ignite.internal.util.nio.GridDirectParser; import org.apache.ignite.internal.util.nio.GridNioCodecFilter; import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory; import org.apache.ignite.internal.util.nio.GridNioMessageTracker; +import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory; import org.apache.ignite.internal.util.nio.GridNioMetricsListener; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -1575,29 +1577,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - MessageFormatter msgFormatter = new MessageFormatter() { - private MessageFormatter impl; + GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() { + private MessageFormatter formatter; - @Override public MessageWriter writer() { - if (impl == null) - impl = getSpiContext().messageFormatter(); + @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) + throws IgniteCheckedException { + if (formatter == null) + formatter = getSpiContext().messageFormatter(); - assert impl != null; + assert formatter != null; - return impl.writer(); + UUID rmtNodeId = ses.meta(NODE_ID_META); + + return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; } + }; - @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - if (impl == null) - impl = getSpiContext().messageFormatter(); + GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() { + private MessageFormatter formatter; - assert impl != null; + @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException { + if (formatter == null) + formatter = getSpiContext().messageFormatter(); + + assert formatter != null; + + UUID rmtNodeId = ses.meta(NODE_ID_META); - return impl.reader(factory, msgCls); + return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; } }; - GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter); + GridDirectParser parser = new GridDirectParser(msgFactory, readerFactory); IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() { @Override public boolean apply(Message msg) { @@ -1658,7 +1669,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter .metricsListener(metricsLsnr) .writeTimeout(sockWriteTimeout) .filters(filters) - .messageFormatter(msgFormatter) + .writerFactory(writerFactory) .skipRecoveryPredicate(skipRecoveryPred) .messageQueueSizeListener(queueSizeMonitor) .build(); @@ -1918,7 +1929,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter UUID nodeId = null; - if (!client.async() && !locNode.version().equals(node.version())) + if (!client.async()) nodeId = node.id(); retry = client.sendMessage(nodeId, msg, ackC); @@ -2591,7 +2602,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter buf.order(ByteOrder.nativeOrder()); - boolean written = msg.writeTo(buf, getSpiContext().messageFormatter().writer()); + boolean written = msg.writeTo(buf, null); assert written; @@ -2932,25 +2943,34 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } }; - MessageFormatter msgFormatter = new MessageFormatter() { - private MessageFormatter impl; + GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() { + private MessageFormatter formatter; - @Override public MessageWriter writer() { - if (impl == null) - impl = getSpiContext().messageFormatter(); + @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException { + if (formatter == null) + formatter = getSpiContext().messageFormatter(); - assert impl != null; + assert formatter != null; - return impl.writer(); + UUID rmtNodeId = ses.meta(NODE_ID_META); + + return rmtNodeId != null ? formatter.writer(rmtNodeId) : null; } + }; - @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - if (impl == null) - impl = getSpiContext().messageFormatter(); + GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() { + private MessageFormatter formatter; - assert impl != null; + @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) + throws IgniteCheckedException { + if (formatter == null) + formatter = getSpiContext().messageFormatter(); + + assert formatter != null; + + UUID rmtNodeId = ses.meta(NODE_ID_META); - return impl.reader(factory, msgCls); + return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null; } }; @@ -2959,8 +2979,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log, endpoint, srvLsnr, - msgFormatter, - new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true), + writerFactory, + new GridNioCodecFilter(new GridDirectParser(msgFactory, readerFactory), log, true), new GridConnectionBytesVerifyFilter(log) ); http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java index f0f41bf..176973e 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java @@ -29,6 +29,11 @@ import org.apache.ignite.cache.CacheEntryProcessor; /** * Convenience adapter to transform update existing values in streaming cache * based on the previously cached value. + * <p> + * This transformer implement {@link EntryProcessor} and internally will call + * {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method. Note + * that the value received from the data streamer will be passed to the entry + * processor as an argument. */ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, EntryProcessor<K, V, Object> { /** */ @@ -37,7 +42,7 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E /** {@inheritDoc} */ @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException { for (Map.Entry<K, V> entry : entries) - cache.invoke(entry.getKey(), this); + cache.invoke(entry.getKey(), this, entry.getValue()); } /** @@ -53,4 +58,4 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E } }; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java index 906a050..e257a97 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java @@ -37,12 +37,12 @@ import org.apache.ignite.events.TaskEvent; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; +import org.apache.ignite.internal.managers.communication.GridIoManager; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFormatter; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -492,12 +492,12 @@ public class GridSpiTestContext implements IgniteSpiContext { @Override public MessageFormatter messageFormatter() { if (formatter == null) { formatter = new MessageFormatter() { - @Override public MessageWriter writer() { - return new DirectMessageWriter(); + @Override public MessageWriter writer(UUID rmtNodeId) { + return new DirectMessageWriter(GridIoManager.DIRECT_PROTO_VER); } - @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) { - return new DirectMessageReader(factory, this); + @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) { + return new DirectMessageReader(msgFactory, GridIoManager.DIRECT_PROTO_VER); } }; } @@ -573,4 +573,4 @@ public class GridSpiTestContext implements IgniteSpiContext { this.obj = obj; } } -} \ No newline at end of file +}