This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 63892a7902f IGNITE-26976 Add compression capability to the
serialization framework (#12527)
63892a7902f is described below
commit 63892a7902f3abccd350185e1a46be2304b334cc
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Feb 27 17:11:14 2026 +0500
IGNITE-26976 Add compression capability to the serialization framework
(#12527)
---
.../java/org/apache/ignite/internal/Compress.java | 30 ++
.../internal/MessageSerializerGenerator.java | 74 ++++-
.../internal/direct/DirectMessageReader.java | 80 +++++-
.../internal/direct/DirectMessageWriter.java | 108 +++++++-
.../direct/stream/DirectByteBufferStream.java | 37 +++
.../managers/communication/CompressedMessage.java | 303 +++++++++++++++++++++
.../managers/communication/GridIoManager.java | 2 +-
.../communication/GridIoMessageFactory.java | 11 +
.../cache/GridCachePartitionExchangeManager.java | 23 +-
.../dht/preloader/GridDhtPartitionFullMap.java | 103 ++++++-
.../dht/preloader/GridDhtPartitionMap.java | 27 +-
.../GridDhtPartitionsAbstractMessage.java | 17 --
.../preloader/GridDhtPartitionsExchangeFuture.java | 18 +-
.../preloader/GridDhtPartitionsFullMessage.java | 124 +++------
.../preloader/GridDhtPartitionsSingleMessage.java | 52 +---
.../internal/util/GridPartitionStateMap.java | 18 +-
.../extensions/communication/MessageReader.java | 38 ++-
.../extensions/communication/MessageWriter.java | 36 ++-
.../tcp/internal/GridNioServerWrapper.java | 2 +-
.../internal/codegen/MessageProcessorTest.java | 34 +++
.../AbstractMessageSerializationTest.java | 18 +-
.../communication/CompressedMessageTest.java | 147 ++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
.../codegen/TestCollectionsCompressedMessage.java | 31 +++
.../TestCompressUnsupportedTypeMessage.java | 31 +++
.../resources/codegen/TestCompressedMessage.java | 30 ++
.../codegen/TestMapCompressedMessage.java | 31 +++
27 files changed, 1203 insertions(+), 224 deletions(-)
diff --git
a/modules/codegen/src/main/java/org/apache/ignite/internal/Compress.java
b/modules/codegen/src/main/java/org/apache/ignite/internal/Compress.java
new file mode 100644
index 00000000000..65dea1348f8
--- /dev/null
+++ b/modules/codegen/src/main/java/org/apache/ignite/internal/Compress.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** This annotation indicates that this field will be compressed during
serialization. */
+@Retention(RetentionPolicy.CLASS)
+@Target(ElementType.FIELD)
+public @interface Compress {
+ // No-op.
+}
diff --git
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index 269407c73ee..51b756c49fb 100644
---
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -88,6 +88,10 @@ public class MessageSerializerGenerator {
/** */
static final String DLFT_ENUM_MAPPER_CLS =
"org.apache.ignite.plugin.extensions.communication.mappers.DefaultEnumMapper";
+ /** */
+ private static final String COMPRESSED_MSG_ERROR = "CompressedMessage
should not be used explicitly. " +
+ "To compress the required field use the @Compress annotation.";
+
/** Collection of lines for {@code writeTo} method. */
private final List<String> write = new ArrayList<>();
@@ -320,6 +324,11 @@ public class MessageSerializerGenerator {
TypeMirror type = field.asType();
+ boolean compress = field.getAnnotation(Compress.class) != null;
+
+ if (compress)
+ checkTypeForCompress(type);
+
if (type.getKind().isPrimitive()) {
String typeName = capitalizeOnlyFirst(type.getKind().name());
@@ -371,9 +380,15 @@ public class MessageSerializerGenerator {
imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType");
- returnFalseIfWriteFailed(write, field, "writer.writeMap",
getExpr,
- "MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(0)),
- "MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(1)));
+ List<String> args = new ArrayList<>();
+ args.add(getExpr);
+ args.add("MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(0)));
+ args.add("MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(1)));
+
+ if (compress)
+ args.add("true"); // the value of the compress argument in
the MessageWriter#writeMap method
+
+ returnFalseIfWriteFailed(write, field, "writer.writeMap",
args.toArray(String[]::new));
}
else if (assignableFrom(type,
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
@@ -385,8 +400,15 @@ public class MessageSerializerGenerator {
else if (assignableFrom(type,
type("org.apache.ignite.internal.util.GridLongList")))
returnFalseIfWriteFailed(write, field,
"writer.writeGridLongList", getExpr);
- else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
- returnFalseIfWriteFailed(write, field, "writer.writeMessage",
getExpr);
+ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
+ if (sameType(type,
"org.apache.ignite.internal.managers.communication.CompressedMessage"))
+ throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
+
+ if (compress)
+ returnFalseIfWriteFailed(write, field,
"writer.writeMessage", getExpr, "true");
+ else
+ returnFalseIfWriteFailed(write, field,
"writer.writeMessage", getExpr);
+ }
else if (assignableFrom(erasedType(type),
type(Collection.class.getName()))) {
List<? extends TypeMirror> typeArgs =
((DeclaredType)type).getTypeArguments();
@@ -530,6 +552,11 @@ public class MessageSerializerGenerator {
private void returnFalseIfReadFailed(VariableElement field) throws
Exception {
TypeMirror type = field.asType();
+ boolean compress = field.getAnnotation(Compress.class) != null;
+
+ if (compress)
+ checkTypeForCompress(type);
+
if (type.getKind().isPrimitive()) {
String typeName = capitalizeOnlyFirst(type.getKind().name());
@@ -602,9 +629,15 @@ public class MessageSerializerGenerator {
assert typeArgs.size() == 2;
- returnFalseIfReadFailed(field, "reader.readMap",
- "MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(0)),
- "MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(1)), "false");
+ List<String> args = new ArrayList<>();
+ args.add("MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(0)));
+ args.add("MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(1)));
+ args.add("false"); // the value of the linked argument in the
MessageReader#readMap method
+
+ if (compress)
+ args.add("true"); // the value of the compress argument in
the MessageReader#readMap method
+
+ returnFalseIfReadFailed(field, "reader.readMap",
args.toArray(String[]::new));
}
else if (assignableFrom(type,
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
@@ -616,8 +649,15 @@ public class MessageSerializerGenerator {
else if (assignableFrom(type,
type("org.apache.ignite.internal.util.GridLongList")))
returnFalseIfReadFailed(field, "reader.readGridLongList");
- else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
- returnFalseIfReadFailed(field, "reader.readMessage");
+ else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
+ if (sameType(type,
"org.apache.ignite.internal.managers.communication.CompressedMessage"))
+ throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
+
+ if (compress)
+ returnFalseIfReadFailed(field, "reader.readMessage",
"true");
+ else
+ returnFalseIfReadFailed(field, "reader.readMessage");
+ }
else if (assignableFrom(erasedType(type),
type(Collection.class.getName()))) {
List<? extends TypeMirror> typeArgs =
((DeclaredType)type).getTypeArguments();
@@ -703,6 +743,9 @@ public class MessageSerializerGenerator {
if (primitiveType != null)
return primitiveType.getKind().toString();
+
+ if (sameType(type,
"org.apache.ignite.internal.managers.communication.CompressedMessage"))
+ throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
}
if (!assignableFrom(type, type(MESSAGE_INTERFACE)))
@@ -954,4 +997,15 @@ public class MessageSerializerGenerator {
return sb.toString();
}
+
+ /** Checks that the Compress annotation is used only for supported types:
Map and Message. */
+ private void checkTypeForCompress(TypeMirror type) {
+ if (type.getKind() == TypeKind.DECLARED) {
+ if (assignableFrom(erasedType(type), type(Map.class.getName())) ||
+ assignableFrom(type, type(MESSAGE_INTERFACE)))
+ return;
+ }
+
+ throw new IllegalArgumentException("Compress annotation is used for an
unsupported type: " + type);
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 24d97da7b2d..3217d7630a0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -23,9 +23,11 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Function;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,6 +51,12 @@ public class DirectMessageReader implements MessageReader {
@GridToStringInclude
private final DirectMessageState<StateItem> state;
+ /** Message factory. */
+ private final MessageFactory msgFactory;
+
+ /** Cache object processor. */
+ private final IgniteCacheObjectProcessor cacheObjProc;
+
/** Buffer for reading. */
private ByteBuffer buf;
@@ -60,6 +68,9 @@ public class DirectMessageReader implements MessageReader {
* @param cacheObjProc Cache object processor.
*/
public DirectMessageReader(final MessageFactory msgFactory,
IgniteCacheObjectProcessor cacheObjProc) {
+ this.msgFactory = msgFactory;
+ this.cacheObjProc = cacheObjProc;
+
state = new DirectMessageState<>(StateItem.class, new
IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(msgFactory, cacheObjProc);
@@ -315,12 +326,21 @@ public class DirectMessageReader implements MessageReader
{
}
/** {@inheritDoc} */
- @Nullable @Override public <T extends Message> T readMessage() {
+ @Nullable @Override public <T extends Message> T readMessage(boolean
compress) {
DirectByteBufferStream stream = state.item().stream;
- T msg = stream.readMessage(this);
+ T msg;
- lastRead = stream.lastFinished();
+ if (compress)
+ msg = readCompressedMessageAndDeserialize(
+ stream,
+ tmpReader ->
tmpReader.state.item().stream.readMessage(tmpReader)
+ );
+ else {
+ msg = stream.readMessage(this);
+
+ lastRead = stream.lastFinished();
+ }
return msg;
}
@@ -393,12 +413,21 @@ public class DirectMessageReader implements MessageReader
{
/** {@inheritDoc} */
@Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType
keyType,
- MessageCollectionItemType valType, boolean linked) {
+ MessageCollectionItemType valType, boolean linked, boolean compress) {
DirectByteBufferStream stream = state.item().stream;
- M map = stream.readMap(keyType, valType, linked, this);
+ M map;
- lastRead = stream.lastFinished();
+ if (compress)
+ map = readCompressedMessageAndDeserialize(
+ stream,
+ tmpReader -> tmpReader.state.item().stream.readMap(keyType,
valType, linked, tmpReader)
+ );
+ else {
+ map = stream.readMap(keyType, valType, linked, this);
+
+ lastRead = stream.lastFinished();
+ }
return map;
}
@@ -418,6 +447,11 @@ public class DirectMessageReader implements MessageReader {
state.item().state++;
}
+ /** {@inheritDoc} */
+ @Override public void decrementState() {
+ state.item().state--;
+ }
+
/** {@inheritDoc} */
@Override public void beforeInnerMessageRead() {
state.forward();
@@ -440,6 +474,40 @@ public class DirectMessageReader implements MessageReader {
return S.toString(DirectMessageReader.class, this);
}
+ /** @return Deserialized object. */
+ private <T> T readCompressedMessageAndDeserialize(DirectByteBufferStream
stream, Function<DirectMessageReader, T> fun) {
+ Message msg = stream.readMessage(this);
+
+ lastRead = stream.lastFinished();
+
+ if (!lastRead || msg == null)
+ return null;
+
+ assert msg instanceof CompressedMessage : msg;
+
+ CompressedMessage msg0 = (CompressedMessage)msg;
+
+ if (msg0.dataSize() == 0)
+ return null;
+
+ byte[] uncompressed = msg0.uncompressed();
+
+ ByteBuffer tmpBuf = ByteBuffer.allocateDirect(uncompressed.length);
+
+ tmpBuf.put(uncompressed);
+ tmpBuf.flip();
+
+ DirectMessageReader tmpReader = new DirectMessageReader(msgFactory,
cacheObjProc);
+
+ tmpReader.setBuffer(tmpBuf);
+
+ T res = fun.apply(tmpReader);
+
+ lastRead = tmpReader.state.item().stream.lastFinished();
+
+ return res;
+ }
+
/**
*/
private static class StateItem implements DirectMessageStateItem {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 1da76aa14cf..2cb363f1bd4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -23,9 +23,11 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.function.Consumer;
import org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -40,19 +42,41 @@ import
org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.configuration.IgniteConfiguration.DFLT_NETWORK_COMPRESSION;
+
/**
* Message writer implementation.
*/
public class DirectMessageWriter implements MessageWriter {
+ /** Temporary buffer capacity. */
+ private static final int TMP_BUF_CAPACITY = 10 * 1024;
+
/** State. */
@GridToStringInclude
private final DirectMessageState<StateItem> state;
+ /** Message factory. */
+ private final MessageFactory msgFactory;
+
+ /** Compression level. Used only for {@link CompressedMessage}. */
+ private final int compressionLvl;
+
/** Buffer for writing. */
private ByteBuffer buf;
- /** */
+ /** @param msgFactory Message factory. */
public DirectMessageWriter(final MessageFactory msgFactory) {
+ this(msgFactory, DFLT_NETWORK_COMPRESSION);
+ }
+
+ /**
+ * @param msgFactory Message factory.
+ * @param compressionLvl Compression level.
+ */
+ public DirectMessageWriter(final MessageFactory msgFactory, final int
compressionLvl) {
+ this.msgFactory = msgFactory;
+ this.compressionLvl = compressionLvl;
+
state = new DirectMessageState<>(StateItem.class, new
IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
return new StateItem(msgFactory);
@@ -293,10 +317,17 @@ public class DirectMessageWriter implements MessageWriter
{
}
/** {@inheritDoc} */
- @Override public boolean writeMessage(@Nullable Message msg) {
+ @Override public boolean writeMessage(@Nullable Message msg, boolean
compress) {
DirectByteBufferStream stream = state.item().stream;
- stream.writeMessage(msg, this);
+ if (compress)
+ writeCompressedMessage(
+ tmpWriter -> tmpWriter.state.item().stream.writeMessage(msg,
tmpWriter),
+ msg == null,
+ stream
+ );
+ else
+ stream.writeMessage(msg, this);
return stream.lastFinished();
}
@@ -353,10 +384,17 @@ public class DirectMessageWriter implements MessageWriter
{
/** {@inheritDoc} */
@Override public <K, V> boolean writeMap(Map<K, V> map,
MessageCollectionItemType keyType,
- MessageCollectionItemType valType) {
+ MessageCollectionItemType valType, boolean compress) {
DirectByteBufferStream stream = state.item().stream;
- stream.writeMap(map, keyType, valType, this);
+ if (compress)
+ writeCompressedMessage(
+ tmpWriter -> tmpWriter.state.item().stream.writeMap(map,
keyType, valType, tmpWriter),
+ map == null,
+ stream
+ );
+ else
+ stream.writeMap(map, keyType, valType, this);
return stream.lastFinished();
}
@@ -381,6 +419,11 @@ public class DirectMessageWriter implements MessageWriter {
state.item().state++;
}
+ /** {@inheritDoc} */
+ @Override public void decrementState() {
+ state.item().state--;
+ }
+
/** {@inheritDoc} */
@Override public void beforeInnerMessageWrite() {
state.forward();
@@ -403,6 +446,61 @@ public class DirectMessageWriter implements MessageWriter {
return S.toString(DirectMessageWriter.class, this);
}
+ /**
+ * @param consumer Consumer.
+ * @param isNull {@code True} if message is null.
+ * @param stream Byte buffer stream.
+ */
+ private void writeCompressedMessage(Consumer<DirectMessageWriter>
consumer, boolean isNull, DirectByteBufferStream stream) {
+ if (isNull) {
+ stream.writeShort(Short.MIN_VALUE);
+
+ return;
+ }
+
+ if (!stream.serializeFinished()) {
+ ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY);
+
+ DirectMessageWriter tmpWriter = new
DirectMessageWriter(msgFactory, compressionLvl);
+
+ tmpWriter.setBuffer(tmpBuf);
+
+ boolean finished;
+
+ do {
+ if (tmpBuf.remaining() <= tmpBuf.capacity() / 10) {
+ byte[] bytes = new byte[tmpBuf.position()];
+
+ tmpBuf.flip();
+ tmpBuf.get(bytes);
+
+ tmpBuf = ByteBuffer.allocateDirect(tmpBuf.capacity() * 2);
+
+ tmpBuf.put(bytes);
+
+ tmpWriter.setBuffer(tmpBuf);
+ }
+
+ consumer.accept(tmpWriter);
+
+ finished = tmpWriter.state.item().stream.lastFinished();
+ }
+ while (!finished);
+
+ tmpBuf.flip();
+
+ stream.compressedMessage(new CompressedMessage(tmpBuf,
compressionLvl));
+ stream.serializeFinished(true);
+ }
+
+ stream.writeMessage(stream.compressedMessage(), this);
+
+ if (stream.lastFinished()) {
+ stream.compressedMessage(null);
+ stream.serializeFinished(false);
+ }
+ }
+
/**
*/
private static class StateItem implements DirectMessageStateItem {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
index 5c526bf6fd3..ca3ecfc7fbc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -339,6 +340,12 @@ public class DirectByteBufferStream {
/** */
private byte cacheObjType;
+ /** */
+ private CompressedMessage compressedMsg;
+
+ /** */
+ private boolean serializeFinished;
+
/**
* Constructror for stream used for writing messages.
*
@@ -390,6 +397,36 @@ public class DirectByteBufferStream {
return lastFinished;
}
+ /**
+ * @return Compressed message.
+ */
+ public CompressedMessage compressedMessage() {
+ assert compressedMsg != null;
+
+ return compressedMsg;
+ }
+
+ /**
+ * @param compressedMsg Compressed message.
+ */
+ public void compressedMessage(CompressedMessage compressedMsg) {
+ this.compressedMsg = compressedMsg;
+ }
+
+ /**
+ * @return Whether last object was fully serialized.
+ */
+ public boolean serializeFinished() {
+ return serializeFinished;
+ }
+
+ /**
+ * @param serializeFinished {@code True} if last object was fully
serialized.
+ */
+ public void serializeFinished(boolean serializeFinished) {
+ this.serializeFinished = serializeFinished;
+ }
+
/**
* @param val Value.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
new file mode 100644
index 00000000000..95565217f86
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Internal message used when transmitting fields annotated with @Compress
over the network.
+ * <p>
+ * WARNING: CompressedMessage is not intended for explicit use in messages.
+ */
+public class CompressedMessage implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = -101;
+
+ /** Chunk size. */
+ static final int CHUNK_SIZE = 10 * 1024;
+
+ /** Reader buffer capacity. */
+ private static final int BUFFER_CAPACITY = 10 * CHUNK_SIZE;
+
+ /** Temporary buffer for compressed data received over the network. */
+ private ByteBuffer tmpBuf;
+
+ /** Raw data size. */
+ private int dataSize;
+
+ /** Chunked byte reader. */
+ private ChunkedByteReader chunkedReader;
+
+ /** Chunk. */
+ private byte[] chunk;
+
+ /** Flag indicating whether this is the last chunk. */
+ private boolean finalChunk;
+
+ /** Compression level. */
+ private int compressionLvl;
+
+ /** Constructor. */
+ public CompressedMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param buf Source buffer with serialized data.
+ * @param compressionLvl Compression level.
+ */
+ public CompressedMessage(ByteBuffer buf, int compressionLvl) {
+ dataSize = buf.remaining();
+ this.compressionLvl = compressionLvl;
+
+ if (dataSize > 0)
+ chunkedReader = new ChunkedByteReader(compress(buf));
+ }
+
+ /** @return Raw data size. */
+ public int dataSize() {
+ return dataSize;
+ }
+
+ /** @return Uncompressed data. */
+ public byte[] uncompressed() {
+ assert finalChunk;
+
+ return uncompress();
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ while (true) {
+ if (chunk == null && chunkedReader != null) {
+ chunk = chunkedReader.nextChunk();
+
+ finalChunk = (chunk == null);
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt(dataSize))
+ return false;
+
+ writer.incrementState();
+
+ if (dataSize == 0)
+ return true;
+
+ case 1:
+ if (!writer.writeBoolean(finalChunk))
+ return false;
+
+ writer.incrementState();
+
+ if (finalChunk)
+ return true;
+
+ case 2:
+ if (!writer.writeByteArray(chunk))
+ return false;
+
+ chunk = null;
+
+ writer.decrementState();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (tmpBuf == null)
+ tmpBuf = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
+
+ assert chunk == null : chunk;
+
+ while (true) {
+ switch (reader.state()) {
+ case 0:
+ dataSize = reader.readInt();
+
+ if (!reader.isLastRead())
+ return false;
+
+ if (dataSize == 0)
+ return true;
+
+ reader.incrementState();
+
+ case 1:
+ finalChunk = reader.readBoolean();
+
+ if (!reader.isLastRead())
+ return false;
+
+ if (finalChunk)
+ return true;
+
+ reader.incrementState();
+
+ case 2:
+ chunk = reader.readByteArray();
+
+ if (!reader.isLastRead())
+ return false;
+
+ if (chunk != null) {
+ if (tmpBuf.remaining() <= CHUNK_SIZE) {
+ ByteBuffer newTmpBuf =
ByteBuffer.allocateDirect(tmpBuf.capacity() * 2);
+
+ tmpBuf.flip();
+
+ newTmpBuf.put(tmpBuf);
+
+ tmpBuf = newTmpBuf;
+ }
+
+ tmpBuf.put(chunk);
+
+ reader.decrementState();
+
+ chunk = null;
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+
+ /**
+ * @param buf Buffer.
+ * @return Compressed data.
+ */
+ private byte[] compress(ByteBuffer buf) {
+ byte[] data = new byte[dataSize];
+
+ buf.get(data);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
+ Deflater deflater = new Deflater(compressionLvl, true);
+
+ try (DeflaterOutputStream dos = new DeflaterOutputStream(baos,
deflater)) {
+ dos.write(data);
+ dos.finish();
+ }
+ catch (IOException ex) {
+ throw new IgniteException(ex);
+ }
+ finally {
+ deflater.end();
+ }
+
+ return baos.toByteArray();
+ }
+
+ /** @return Uncompressed data. */
+ private byte[] uncompress() {
+ if (tmpBuf == null)
+ return null;
+
+ byte[] uncompressedData;
+
+ Inflater inflater = new Inflater(true);
+
+ byte[] bytes = new byte[tmpBuf.position()];
+
+ tmpBuf.flip();
+ tmpBuf.get(bytes);
+
+ try (InflaterInputStream iis = new InflaterInputStream(new
ByteArrayInputStream(bytes), inflater)) {
+ uncompressedData = iis.readAllBytes();
+ }
+ catch (IOException ex) {
+ throw new IgniteException(ex);
+ }
+ finally {
+ inflater.end();
+ }
+
+ assert uncompressedData != null;
+ assert uncompressedData.length == dataSize : "Expected=" + dataSize +
", actual=" + uncompressedData.length;
+
+ tmpBuf = null;
+
+ return uncompressedData;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CompressedMessage.class, this);
+ }
+
+ /** Byte reader returning data chunks of predefined size. */
+ private static class ChunkedByteReader {
+ /** Input data. */
+ private final byte[] inputData;
+
+ /** Current position. */
+ private int pos;
+
+ /** Constructor. */
+ ChunkedByteReader(byte[] inputData) {
+ this.inputData = inputData;
+ }
+
+ /** @return Next chunk of bytes or null. */
+ byte[] nextChunk() {
+ if (pos >= inputData.length)
+ return null;
+
+ int curChunkSize = Math.min(inputData.length - pos, CHUNK_SIZE);
+
+ byte[] chunk = new byte[curChunkSize];
+
+ System.arraycopy(inputData, pos, chunk, 0, curChunkSize);
+
+ pos += curChunkSize;
+
+ return chunk;
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index b3032940866..07785b6913f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -433,7 +433,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
else {
formatter = new MessageFormatter() {
@Override public MessageWriter writer(MessageFactory
msgFactory) {
- return new DirectMessageWriter(msgFactory);
+ return new DirectMessageWriter(msgFactory,
ctx.config().getNetworkCompressionLevel());
}
@Override public MessageReader reader(MessageFactory
msgFactory) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7ccc0ef4a76..8756e664c1e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -178,6 +178,10 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessageSerializer;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeIdSerializer;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMapSerializer;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMapSerializer;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageSerializer;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -317,6 +321,8 @@ import
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeployment
import
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultSerializer;
import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.GridByteArrayListSerializer;
+import org.apache.ignite.internal.util.GridPartitionStateMap;
+import org.apache.ignite.internal.util.GridPartitionStateMapSerializer;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.UUIDCollectionMessageSerializer;
import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
@@ -343,6 +349,8 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
// -54 is reserved for SQL.
+ // We don't use the code‑generated serializer for CompressedMessage -
serialization is highly customized.
+ factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new);
factory.register((short)-100, ErrorMessage::new, new
ErrorMessageSerializer());
factory.register((short)-65, TxInfo::new, new TxInfoSerializer());
factory.register((short)-64, TxEntriesInfo::new, new
TxEntriesInfoSerializer());
@@ -530,6 +538,9 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
new IgniteDhtPartitionsToReloadMapSerializer());
factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new
IntLongMapSerializer());
factory.register(IndexKeyTypeMessage.TYPE_CODE,
IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer());
+ factory.register(GridPartitionStateMap.TYPE_CODE,
GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
+ factory.register(GridDhtPartitionMap.TYPE_CODE,
GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
+ factory.register(GridDhtPartitionFullMap.TYPE_CODE,
GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5918023afff..71a8f655737 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1288,7 +1288,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
) {
long time = System.currentTimeMillis();
- GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true,
null, null, null, null, grps);
+ GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null,
null, null, null, grps);
m.topologyVersion(msgTopVer);
@@ -1342,8 +1342,6 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
/**
* Creates partitions full message for all cache groups.
*
- * @param compress {@code True} if possible to compress message (properly
work only if prepareMarshall/
- * finishUnmarshall methods are called).
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
* @param partHistSuppliers Partition history suppliers map.
@@ -1351,7 +1349,6 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
* @return Message.
*/
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
- boolean compress,
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -1359,14 +1356,12 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
) {
Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
- return createPartitionsFullMessage(compress, exchId, lastVer,
partHistSuppliers, partsToReload, grps);
+ return createPartitionsFullMessage(exchId, lastVer, partHistSuppliers,
partsToReload, grps);
}
/**
* Creates partitions full message for selected cache groups.
*
- * @param compress {@code True} if possible to compress message (properly
work only if prepareMarshall/
- * finishUnmarshall methods are called).
* @param exchId Non-null exchange ID if message is created for exchange.
* @param lastVer Last version.
* @param partHistSuppliers Partition history suppliers map.
@@ -1375,7 +1370,6 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
* @return Message.
*/
public GridDhtPartitionsFullMessage createPartitionsFullMessage(
- boolean compress,
@Nullable final GridDhtPartitionExchangeId exchId,
@Nullable GridCacheVersion lastVer,
@Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -1387,8 +1381,6 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
final GridDhtPartitionsFullMessage m =
new GridDhtPartitionsFullMessage(exchId, lastVer, ver,
partHistSuppliers, partsToReload);
- m.compressed(compress);
-
final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new
HashMap<>();
Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>();
@@ -1406,7 +1398,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
if (locMap != null)
- addFullPartitionsMap(m, dupData, compress, grp.groupId(),
locMap, affCache.similarAffinityKey());
+ addFullPartitionsMap(m, dupData, grp.groupId(), locMap,
affCache.similarAffinityKey());
Map<Integer, Long> partSizesMap = grp.topology().globalPartSizes();
@@ -1426,7 +1418,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
GridDhtPartitionFullMap map = top.partitionMap(true);
if (map != null)
- addFullPartitionsMap(m, dupData, compress, top.groupId(), map,
top.similarAffinityKey());
+ addFullPartitionsMap(m, dupData, top.groupId(), map,
top.similarAffinityKey());
if (exchId != null) {
m.addPartitionUpdateCounters(top.groupId(),
top.fullUpdateCounters());
@@ -1449,14 +1441,12 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
/**
* @param m Message.
* @param dupData Duplicated data map.
- * @param compress {@code True} if need check for duplicated partition
state data.
* @param grpId Cache group ID.
* @param map Map to add.
* @param affKey Cache affinity key.
*/
private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
- boolean compress,
Integer grpId,
GridDhtPartitionFullMap map,
Object affKey) {
@@ -1464,7 +1454,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
Integer dupDataCache = null;
- if (compress && affKey != null && !m.containsGroup(grpId)) {
+ if (affKey != null && !m.containsGroup(grpId)) {
T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
if (state0 != null && state0.get2().partitionStateEquals(map)) {
@@ -1556,8 +1546,7 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
) {
GridDhtPartitionsSingleMessage m = new
GridDhtPartitionsSingleMessage(exchangeId,
clientOnlyExchange,
- cctx.versions().last(),
- true);
+ cctx.versions().last());
Map<Object, T2<Integer, GridPartitionStateMap>> dupData = new
HashMap<>();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 068d5a3f5bb..44e710d13ae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -21,30 +21,45 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.AbstractMap;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.NotNull;
/**
* Full partition map from all nodes.
*/
public class GridDhtPartitionFullMap
- extends HashMap<UUID, GridDhtPartitionMap> implements
Comparable<GridDhtPartitionFullMap>, Externalizable {
+ extends AbstractMap<UUID, GridDhtPartitionMap> implements
Comparable<GridDhtPartitionFullMap>, Externalizable, Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 519;
+
/** */
private static final long serialVersionUID = 0L;
/** Node ID. */
- private UUID nodeId;
+ @Order(0)
+ UUID nodeId;
/** Node order. */
- private long nodeOrder;
+ @Order(1)
+ long nodeOrder;
/** Update sequence number. */
- private long updateSeq;
+ @Order(2)
+ long updateSeq;
+
+ /** Partition map. */
+ @Order(3)
+ Map<UUID, GridDhtPartitionMap> map;
/**
* @param nodeId Node ID.
@@ -59,6 +74,7 @@ public class GridDhtPartitionFullMap
this.nodeId = nodeId;
this.nodeOrder = nodeOrder;
this.updateSeq = updateSeq;
+ map = new HashMap<>();
}
/**
@@ -77,6 +93,7 @@ public class GridDhtPartitionFullMap
this.nodeId = nodeId;
this.nodeOrder = nodeOrder;
this.updateSeq = updateSeq;
+ map = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionMap> e : m.entrySet()) {
GridDhtPartitionMap part = e.getValue();
@@ -96,7 +113,7 @@ public class GridDhtPartitionFullMap
* @param updateSeq Update sequence.
*/
public GridDhtPartitionFullMap(GridDhtPartitionFullMap m, long updateSeq) {
- super(m);
+ map = new HashMap<>(m);
nodeId = m.nodeId;
nodeOrder = m.nodeOrder;
@@ -107,7 +124,7 @@ public class GridDhtPartitionFullMap
* Empty constructor required for {@link Externalizable}.
*/
public GridDhtPartitionFullMap() {
- // No-op.
+ map = new HashMap<>();
}
/**
@@ -185,7 +202,7 @@ public class GridDhtPartitionFullMap
out.writeLong(nodeOrder);
out.writeLong(updateSeq);
- U.writeMap(out, this);
+ U.writeMap(out, map);
}
/** {@inheritDoc} */
@@ -195,7 +212,12 @@ public class GridDhtPartitionFullMap
nodeOrder = in.readLong();
updateSeq = in.readLong();
- putAll(U.<UUID, GridDhtPartitionMap>readMap(in));
+ map = new HashMap<>();
+
+ Map<UUID, GridDhtPartitionMap> map0 = U.readMap(in);
+
+ if (map0 != null)
+ map.putAll(map0);
}
/** {@inheritDoc} */
@@ -260,4 +282,69 @@ public class GridDhtPartitionFullMap
return Long.compare(updateSeq, o.updateSeq);
}
+
+ /** {@inheritDoc} */
+ @Override public GridDhtPartitionMap put(UUID key, GridDhtPartitionMap
val) {
+ return map.put(key, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(Map<? extends UUID, ? extends
GridDhtPartitionMap> m) {
+ map.putAll(m);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtPartitionMap get(Object key) {
+ return map.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtPartitionMap remove(Object key) {
+ return map.remove(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return map.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsValue(Object val) {
+ return map.containsValue(val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsKey(Object key) {
+ return map.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ map.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Set<UUID> keySet() {
+ return map.keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Collection<GridDhtPartitionMap> values() {
+ return map.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull Set<Entry<UUID, GridDhtPartitionMap>> entrySet()
{
+ return map.entrySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index 3cbf75767e7..b343d55d11b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -26,31 +26,40 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
import static
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
/**
* Partition map from single node.
*/
-public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>,
Externalizable {
+public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>,
Externalizable, Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 518;
+
/** */
private static final long serialVersionUID = 0L;
/** Node ID. */
+ @Order(0)
protected UUID nodeId;
/** Update sequence number. */
+ @Order(1)
protected long updateSeq;
/** Topology version. */
+ @Order(2)
protected AffinityTopologyVersion top;
/** */
+ @Order(value = 3, method = "map")
protected GridPartitionStateMap map;
/** */
@@ -197,6 +206,17 @@ public class GridDhtPartitionMap implements
Comparable<GridDhtPartitionMap>, Ext
return map;
}
+ /**
+ * @param map Partitions state map.
+ */
+ public void map(GridPartitionStateMap map) {
+ this.map = new GridPartitionStateMap();
+
+ if (map != null)
+ for (Map.Entry<Integer, GridDhtPartitionState> entry :
map.entrySet())
+ put(entry.getKey(), entry.getValue());
+ }
+
/**
* @return Node ID.
*/
@@ -337,4 +357,9 @@ public class GridDhtPartitionMap implements
Comparable<GridDhtPartitionMap>, Ext
@Override public String toString() {
return S.toString(GridDhtPartitionMap.class, this, "top", top,
"updateSeq", updateSeq, "size", size());
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index c03ef06bac7..6170e30e20a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -28,9 +28,6 @@ import org.jetbrains.annotations.Nullable;
* Request for single partition info.
*/
public abstract class GridDhtPartitionsAbstractMessage extends
GridCacheMessage {
- /** */
- private static final byte COMPRESSED_FLAG_MASK = 0x01;
-
/** */
private static final byte RESTORE_STATE_FLAG_MASK = 0x02;
@@ -107,20 +104,6 @@ public abstract class GridDhtPartitionsAbstractMessage
extends GridCacheMessage
return lastVer;
}
- /**
- * @return {@code True} if message data is compressed.
- */
- public final boolean compressed() {
- return (flags & COMPRESSED_FLAG_MASK) != 0;
- }
-
- /**
- * @param compressed {@code True} if message data is compressed.
- */
- public final void compressed(boolean compressed) {
- flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) :
(byte)(flags & ~COMPRESSED_FLAG_MASK);
- }
-
/**
* @param restoreState Restore exchange state flag.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3456d33cbab..abf0d704c73 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2117,8 +2117,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange()
&& exchangeLocE != null)) {
msg = new GridDhtPartitionsSingleMessage(exchangeId(),
cctx.kernalContext().clientNode(),
- cctx.versions().last(),
- true);
+ cctx.versions().last());
}
else {
msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
@@ -2165,14 +2164,12 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
}
/**
- * @param compress Message compress flag.
* @return Message.
*/
- private GridDhtPartitionsFullMessage createPartitionsMessage(boolean
compress) {
+ private GridDhtPartitionsFullMessage createPartitionsMessage() {
GridCacheVersion last = lastVer.get();
GridDhtPartitionsFullMessage m =
cctx.exchange().createPartitionsFullMessage(
- compress,
exchangeId(),
last != null ? last : cctx.versions().last(),
partHistSuppliers,
@@ -2958,7 +2955,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
onDone(null, reconnectEx);
- GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true);
+ GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage();
fullMsg.setErrorsMap(exchangeGlobalExceptions);
@@ -3119,7 +3116,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
return;
}
- GridDhtPartitionsFullMessage msg =
createPartitionsMessage(true);
+ GridDhtPartitionsFullMessage msg =
createPartitionsMessage();
msg.rebalanced(rebalanced());
@@ -3289,7 +3286,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
Map<Integer, Map<Integer, List<UUID>>> assignmentChange =
fut.get();
- GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+ GridDhtPartitionsFullMessage m = createPartitionsMessage();
CacheAffinityChangeMessage msg = new
CacheAffinityChangeMessage(exchId, m, assignmentChange);
@@ -3858,7 +3855,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
cctx.versions().onExchange(lastVer.get().order());
- GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+ GridDhtPartitionsFullMessage msg = createPartitionsMessage();
if (!cctx.affinity().rebalanceRequired() && !deactivateCluster())
msg.rebalanced(true);
@@ -4420,8 +4417,7 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
if (dynamicCacheStartExchange() && exchangeLocE != null) {
res = new
GridDhtPartitionsSingleMessage(msg.restoreExchangeId(),
cctx.kernalContext().clientNode(),
- cctx.versions().last(),
- true);
+ cctx.versions().last());
res.setError(exchangeLocE);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0a6da4ecc88..872629ddf19 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -29,21 +27,19 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.Compress;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -60,13 +56,10 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
@GridToStringInclude
private Map<Integer, GridDhtPartitionFullMap> parts;
- /**
- * Serialized local partitions.
- * <p>
- * TODO Remove this field after completing task IGNITE-26976.
- */
+ /** Partitions without duplicated data. */
@Order(6)
- byte[] partsBytes;
+ @Compress
+ Map<Integer, GridDhtPartitionFullMap> locParts;
/** */
@Order(7)
@@ -74,16 +67,19 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
/** Partitions update counters. */
@Order(8)
+ @Compress
@GridToStringInclude
IgniteDhtPartitionCountersMap partCntrs;
/** Partitions history suppliers. */
@Order(9)
+ @Compress
@GridToStringInclude
IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
/** Partitions that must be cleared and re-loaded. */
@Order(10)
+ @Compress
@GridToStringInclude
IgniteDhtPartitionsToReloadMap partsToReload;
@@ -104,8 +100,9 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
* All logic resides within getter and setter.
*/
@Order(value = 13, method = "errorMessages")
+ @Compress
@SuppressWarnings("unused")
- Map<UUID, ErrorMessage> errMsgs;
+ private Map<UUID, ErrorMessage> errMsgs;
/** */
@Order(14)
@@ -162,25 +159,9 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
- if (parts != null) {
- cp.parts = new HashMap<>(parts.size());
-
- for (Map.Entry<Integer, GridDhtPartitionFullMap> e :
parts.entrySet()) {
- GridDhtPartitionFullMap val = e.getValue();
-
- cp.parts.put(e.getKey(), new GridDhtPartitionFullMap(
- val.nodeId(),
- val.nodeOrder(),
- val.updateSequence(),
- val,
- false));
- }
- }
- else
- cp.parts = null;
-
+ cp.parts = parts != null ? copyPartitionsMap(parts) : null;
cp.dupPartsData = dupPartsData;
- cp.partsBytes = partsBytes;
+ cp.locParts = locParts;
cp.partCntrs = partCntrs;
cp.partHistSuppliers = partHistSuppliers;
cp.partsToReload = partsToReload;
@@ -280,7 +261,6 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
parts.put(grpId, fullMap);
if (dupDataCache != null) {
- assert compressed();
assert parts.containsKey(dupDataCache);
if (dupPartsData == null)
@@ -419,37 +399,8 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
- boolean marshal = !F.isEmpty(parts) && partsBytes == null;
-
- if (marshal) {
- // Reserve at least 2 threads for system operations.
- int parallelismLvl = U.availableThreadCount(ctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
-
- Collection<Object> objectsToMarshall = new ArrayList<>();
-
- if (!F.isEmpty(parts) && partsBytes == null)
- objectsToMarshall.add(parts);
-
- Collection<byte[]> marshalled = U.doInParallel(
- parallelismLvl,
- ctx.kernalContext().pools().getSystemExecutorService(),
- objectsToMarshall,
- new IgniteThrowableFunction<Object, byte[]>() {
- @Override public byte[] apply(Object payload) throws
IgniteCheckedException {
- byte[] marshalled = U.marshal(ctx, payload);
-
- if (compressed())
- marshalled = U.zip(marshalled,
ctx.gridConfig().getNetworkCompressionLevel());
-
- return marshalled;
- }
- });
-
- Iterator<byte[]> iter = marshalled.iterator();
-
- if (!F.isEmpty(parts) && partsBytes == null)
- partsBytes = iter.next();
- }
+ if (!F.isEmpty(parts) && locParts == null)
+ locParts = copyPartitionsMap(parts);
}
/**
@@ -470,37 +421,10 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
@Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig());
-
- Collection<byte[]> objectsToUnmarshall = new ArrayList<>();
-
- // Reserve at least 2 threads for system operations.
- int parallelismLvl = U.availableThreadCount(ctx.kernalContext(),
GridIoPolicy.SYSTEM_POOL, 2);
-
- if (partsBytes != null && parts == null)
- objectsToUnmarshall.add(partsBytes);
-
- Collection<Object> unmarshalled = U.doInParallel(
- parallelismLvl,
- ctx.kernalContext().pools().getSystemExecutorService(),
- objectsToUnmarshall,
- new IgniteThrowableFunction<byte[], Object>() {
- @Override public Object apply(byte[] binary) throws
IgniteCheckedException {
- return compressed()
- ? U.unmarshalZip(ctx.marshaller(), binary, clsLdr)
- : U.unmarshal(ctx, binary, clsLdr);
- }
- }
- );
-
- Iterator<Object> iter = unmarshalled.iterator();
-
- if (partsBytes != null && parts == null) {
- parts = (Map<Integer, GridDhtPartitionFullMap>)iter.next();
+ if (locParts != null && parts == null) {
+ parts = copyPartitionsMap(locParts);
if (dupPartsData != null) {
- assert parts != null;
-
for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
GridDhtPartitionFullMap map1 = parts.get(e.getKey());
GridDhtPartitionFullMap map2 = parts.get(e.getValue());
@@ -588,7 +512,23 @@ public class GridDhtPartitionsFullMessage extends
GridDhtPartitionsAbstractMessa
* Cleans up resources to avoid excessive memory usage.
*/
public void cleanUp() {
- partsBytes = null;
+ locParts = null;
partCntrs = null;
}
+
+ /** */
+ private Map<Integer, GridDhtPartitionFullMap>
copyPartitionsMap(Map<Integer, GridDhtPartitionFullMap> src) {
+ Map<Integer, GridDhtPartitionFullMap> map = new HashMap<>(src.size());
+
+ for (Map.Entry<Integer, GridDhtPartitionFullMap> entry :
src.entrySet()) {
+ GridDhtPartitionFullMap val = entry.getValue();
+
+ map.put(
+ entry.getKey(),
+ new GridDhtPartitionFullMap(val.nodeId(), val.nodeOrder(),
val.updateSequence(), val, false)
+ );
+ }
+
+ return map;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 53d9918569a..840a0f2da54 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Compress;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -30,7 +31,6 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
/**
@@ -39,17 +39,11 @@ import org.jetbrains.annotations.Nullable;
* Sent in response to {@link GridDhtPartitionsSingleRequest} and during
processing partitions exchange future.
*/
public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMessage {
- /** Local partitions. Serialized as {@link #partsBytes}, may be
compressed. */
- @GridToStringInclude
- private Map<Integer, GridDhtPartitionMap> parts;
-
- /**
- * Serialized local partitions. Unmarshalled to {@link #parts}.
- * <p>
- * TODO Remove this field after completing task IGNITE-26976.
- */
+ /** Local partitions. */
@Order(6)
- byte[] partsBytes;
+ @Compress
+ @GridToStringInclude
+ Map<Integer, GridDhtPartitionMap> parts;
/** */
@Order(7)
@@ -57,16 +51,19 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
/** Partitions update counters. */
@Order(8)
+ @Compress
@GridToStringInclude
Map<Integer, CachePartitionPartialCountersMap> partCntrs;
/** Partitions sizes. */
@Order(9)
+ @Compress
@GridToStringInclude
Map<Integer, IntLongMap> partsSizes;
/** Partitions history reservation counters. */
@Order(10)
+ @Compress
@GridToStringInclude
Map<Integer, IntLongMap> partHistCntrs;
@@ -105,17 +102,13 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
* @param exchId Exchange ID.
* @param client Client message flag.
* @param lastVer Last version.
- * @param compress {@code True} if it is possible to use compression for
message.
*/
public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
boolean client,
- @Nullable GridCacheVersion lastVer,
- boolean compress
+ @Nullable GridCacheVersion lastVer
) {
super(exchId, lastVer);
- compressed(compress);
-
this.client = client;
}
@@ -166,7 +159,6 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
parts.put(cacheId, locMap);
if (dupDataCache != null) {
- assert compressed();
assert F.isEmpty(locMap.map());
assert parts.containsKey(dupDataCache);
@@ -303,36 +295,10 @@ public class GridDhtPartitionsSingleMessage extends
GridDhtPartitionsAbstractMes
this.exchangeStartTime = exchangeStartTime;
}
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (parts != null && partsBytes == null) {
- byte[] partsBytes0 = U.marshal(ctx, parts);
-
- if (compressed()) {
- try {
- partsBytes0 = U.zip(partsBytes0);
- }
- catch (IgniteCheckedException e) {
- U.error(ctx.logger(getClass()), "Failed to compress
partitions data: " + e, e);
- }
- }
-
- partsBytes = partsBytes0;
- }
- }
-
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (partsBytes != null && parts == null) {
- parts = compressed()
- ? U.unmarshalZip(ctx.marshaller(), partsBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()))
- : U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
- }
-
if (dupPartsData != null) {
assert parts != null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
index 855d3c79ef4..6db6f53ec62 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
@@ -24,17 +24,22 @@ import java.util.BitSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Set;
+import org.apache.ignite.internal.Order;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.plugin.extensions.communication.Message;
/**
* Grid partition state map. States are encoded using bits.
* <p>
* Null values are prohibited.
*/
-public class GridPartitionStateMap extends AbstractMap<Integer,
GridDhtPartitionState> implements Serializable {
+public class GridPartitionStateMap extends AbstractMap<Integer,
GridDhtPartitionState> implements Serializable, Message {
/** Empty map. */
public static final GridPartitionStateMap EMPTY = new
GridPartitionStateMap(0);
+ /** Type code. */
+ public static final short TYPE_CODE = 517;
+
/** */
private static final long serialVersionUID = 0L;
@@ -55,10 +60,12 @@ public class GridPartitionStateMap extends
AbstractMap<Integer, GridDhtPartition
* The first element takes the first {@link GridPartitionStateMap#BITS}
bits in reverse order,
* the second element next {@link GridPartitionStateMap#BITS} bits in
reverse order, etc.
*/
- private final BitSet states;
+ @Order(0)
+ BitSet states;
/** */
- private int size;
+ @Order(1)
+ int size;
/** {@inheritDoc} */
@Override public Set<Entry<Integer, GridDhtPartitionState>> entrySet() {
@@ -243,4 +250,9 @@ public class GridPartitionStateMap extends
AbstractMap<Integer, GridDhtPartition
@Override public int hashCode() {
return 31 * states.hashCode() + size;
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index c8cad7c0df6..eda96716fac 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -199,7 +199,18 @@ public interface MessageReader {
* @param <T> Type of the message.
* @return Message.
*/
- public <T extends Message> T readMessage();
+ public default <T extends Message> T readMessage() {
+ return readMessage(false);
+ }
+
+ /**
+ * Reads nested message.
+ *
+ * @param compress Whether message should be decompressed.
+ * @param <T> Type of the message.
+ * @return Message.
+ */
+ public <T extends Message> T readMessage(boolean compress);
/**
* Reads {@link CacheObject}.
@@ -259,9 +270,25 @@ public interface MessageReader {
* @param <M> Type of the read map.
* @return Map.
*/
- // TODO: IGNITE-26329 — switch to the new readMap method without the flag
parameter
+ // TODO: IGNITE-26329 — switch to the new readMap method without the
linked flag parameter
+ public default <M extends Map<?, ?>> M readMap(MessageCollectionItemType
keyType,
+ MessageCollectionItemType valType, boolean linked) {
+ return readMap(keyType, valType, linked, false);
+ }
+
+ /**
+ * Reads map.
+ *
+ * @param keyType Map key type.
+ * @param valType Map value type.
+ * @param linked Whether {@link LinkedHashMap} should be created.
+ * @param compress Whether map should be compressed.
+ * @param <M> Type of the read map.
+ * @return Map.
+ */
+ // TODO: IGNITE-26329 — switch to the new readMap method without the
linked flag parameter
public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
- MessageCollectionItemType valType, boolean linked);
+ MessageCollectionItemType valType, boolean linked, boolean compress);
/**
* Tells whether last invocation of any of {@code readXXX(...)}
@@ -284,6 +311,11 @@ public interface MessageReader {
*/
public void incrementState();
+ /**
+ * Decrements read state.
+ */
+ public void decrementState();
+
/**
* Callback called before inner message is read.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 43fda9736d9..e073ca83134 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -247,7 +247,18 @@ public interface MessageWriter {
* @param val Message.
* @return Whether value was fully written.
*/
- public boolean writeMessage(Message val);
+ public default boolean writeMessage(Message val) {
+ return writeMessage(val, false);
+ }
+
+ /**
+ * Writes nested message.
+ *
+ * @param val Message.
+ * @param compress Whether message should be compressed.
+ * @return Whether value was fully written.
+ */
+ public boolean writeMessage(Message val, boolean compress);
/**
* Writes {@link CacheObject}.
@@ -313,8 +324,24 @@ public interface MessageWriter {
* @param <V> Initial value types of the map to write.
* @return Whether value was fully written.
*/
+ public default <K, V> boolean writeMap(Map<K, V> map,
MessageCollectionItemType keyType,
+ MessageCollectionItemType valType) {
+ return writeMap(map, keyType, valType, false);
+ }
+
+ /**
+ * Writes map.
+ *
+ * @param map Map.
+ * @param keyType Map key type.
+ * @param valType Map value type.
+ * @param compress Whether map should be compressed.
+ * @param <K> Initial key types of the map to write.
+ * @param <V> Initial value types of the map to write.
+ * @return Whether value was fully written.
+ */
public <K, V> boolean writeMap(Map<K, V> map, MessageCollectionItemType
keyType,
- MessageCollectionItemType valType);
+ MessageCollectionItemType valType, boolean compress);
/**
* @return Whether header of current message is already written.
@@ -338,6 +365,11 @@ public interface MessageWriter {
*/
public void incrementState();
+ /**
+ * Decrements state.
+ */
+ public void decrementState();
+
/**
* Callback called before inner message is written.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index 95320a94f81..97a048a508d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -878,7 +878,7 @@ public class GridNioServerWrapper {
@Override public MessageWriter writer(GridNioSession ses)
throws IgniteCheckedException {
// Enable sending wait message for a communication
peer while context isn't initialized.
if (!stateProvider.spiContextAvailable())
- return new DirectMessageWriter(msgFactory);
+ return new DirectMessageWriter(msgFactory,
igniteCfg.getNetworkCompressionLevel());
final IgniteSpiContext ctx =
stateProvider.getSpiContextWithoutInitialLatch();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
index 7ddf1fa7e24..019364513a2 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
@@ -270,6 +270,40 @@ public class MessageProcessorTest {
assertThat(compilation).succeeded();
}
+ /**
+ * Negative test that verifies the compilation failed if the
CompressedMessage type is used in Message.
+ */
+ @Test
+ public void testCompressedMessageExplicitUsageFails() {
+ String errMsg = "CompressedMessage should not be used explicitly. To
compress the required field use the @Compress annotation.";
+
+ Compilation compilation = compile("TestCompressedMessage.java");
+
+ assertThat(compilation).failed();
+ assertThat(compilation).hadErrorContaining(errMsg);
+
+ compilation = compile("TestCollectionsCompressedMessage.java");
+
+ assertThat(compilation).failed();
+ assertThat(compilation).hadErrorContaining(errMsg);
+
+ compilation = compile("TestMapCompressedMessage.java");
+
+ assertThat(compilation).failed();
+ assertThat(compilation).hadErrorContaining(errMsg);
+ }
+
+ /**
+ * Negative test that verifies the compilation failed if the Compress
annotation is used for unsupported types.
+ */
+ @Test
+ public void testCompressAnnotationFailsForUnsupportedTypes() {
+ Compilation compilation =
compile("TestCompressUnsupportedTypeMessage.java");
+
+ assertThat(compilation).failed();
+ assertThat(compilation).hadErrorContaining("Compress annotation is
used for an unsupported type: java.util.List");
+ }
+
/** */
private Compilation compile(String... srcFiles) {
return compile(new MessageProcessor(), srcFiles);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
index 4875ec0a465..317ff20a961 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
@@ -286,7 +286,7 @@ public abstract class AbstractMessageSerializationTest {
}
/** {@inheritDoc} */
- @Override public boolean writeMessage(Message val) {
+ @Override public boolean writeMessage(Message val, boolean compress) {
return writeField(Message.class);
}
@@ -307,7 +307,7 @@ public abstract class AbstractMessageSerializationTest {
/** {@inheritDoc} */
@Override public <K, V> boolean writeMap(Map<K, V> map,
MessageCollectionItemType keyType,
- MessageCollectionItemType valType) {
+ MessageCollectionItemType valType, boolean compress) {
return writeField(Map.class);
}
@@ -329,6 +329,11 @@ public abstract class AbstractMessageSerializationTest {
++state;
}
+ /** {@inheritDoc} */
+ @Override public void decrementState() {
+ --state;
+ }
+
/** {@inheritDoc} */
@Override public void beforeInnerMessageWrite() {}
@@ -518,7 +523,7 @@ public abstract class AbstractMessageSerializationTest {
}
/** {@inheritDoc} */
- @Override public <T extends Message> T readMessage() {
+ @Override public <T extends Message> T readMessage(boolean compress) {
readField(Message.class);
return null;
@@ -568,7 +573,7 @@ public abstract class AbstractMessageSerializationTest {
/** {@inheritDoc} */
@Override public <M extends Map<?, ?>> M
readMap(MessageCollectionItemType keyType,
- MessageCollectionItemType valType, boolean linked) {
+ MessageCollectionItemType valType, boolean linked, boolean
compress) {
readField(Map.class);
return null;
@@ -594,6 +599,11 @@ public abstract class AbstractMessageSerializationTest {
++state;
}
+ /** {@inheritDoc} */
+ @Override public void decrementState() {
+ --state;
+ }
+
/** {@inheritDoc} */
@Override public void beforeInnerMessageRead() {}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
new file mode 100644
index 00000000000..270fe7e47b3
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.direct.state.DirectMessageState;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test for {@link CompressedMessage}. */
+public class CompressedMessageTest {
+ /** */
+ @Test
+ public void testWriteReadHugeMessage() {
+ MessageFactory msgFactory = new IgniteMessageFactoryImpl(new
MessageFactoryProvider[]{new GridIoMessageFactory()});
+
+ DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
+
+ ByteBuffer tmpBuf = ByteBuffer.allocate(4096);
+
+ writer.setBuffer(tmpBuf);
+
+ GridDhtPartitionsFullMessage fullMsg = fullMessage();
+
+ boolean finished = false;
+ boolean checkChunkCnt = true;
+
+ int cnt = 0;
+
+ ByteBuffer msgBuf = ByteBuffer.allocate(40_960);
+
+ while (!finished) {
+ finished = writer.writeMessage(fullMsg, true);
+
+ if (checkChunkCnt) {
+ DirectMessageState<?> state = U.field(writer, "state");
+ DirectByteBufferStream stream = U.field(state.item(),
"stream");
+
+ CompressedMessage compressedMsg = stream.compressedMessage();
+
+ assertTrue(compressedMsg.dataSize() > 0);
+
+ byte[] compressedData = U.field((Object)U.field(compressedMsg,
"chunkedReader"), "inputData");
+
+ assertTrue(compressedData.length >
CompressedMessage.CHUNK_SIZE * 2);
+
+ checkChunkCnt = false;
+ }
+
+ tmpBuf.flip();
+
+ msgBuf.put(tmpBuf);
+
+ tmpBuf.clear();
+
+ cnt++;
+ }
+
+ assertTrue(cnt > 2);
+
+ msgBuf.flip();
+
+ DirectMessageReader reader = new DirectMessageReader(msgFactory, null);
+
+ reader.setBuffer(msgBuf);
+
+ Message readMsg = reader.readMessage(true);
+
+ assertTrue(readMsg instanceof GridDhtPartitionsFullMessage);
+
+ assertEqualsFullMsg(fullMsg, (GridDhtPartitionsFullMessage)readMsg);
+ }
+
+ /** */
+ private GridDhtPartitionsFullMessage fullMessage() {
+ IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new
IgniteDhtPartitionHistorySuppliersMap();
+ IgniteDhtPartitionsToReloadMap partsToReload = new
IgniteDhtPartitionsToReloadMap();
+
+ for (int i = 0; i < 500; i++) {
+ UUID uuid = UUID.randomUUID();
+
+ partHistSuppliers.put(uuid, i, i + 1, i + 2);
+ partsToReload.put(uuid, i, i + 1);
+ }
+
+ return new GridDhtPartitionsFullMessage(null, null, new
AffinityTopologyVersion(0), partHistSuppliers, partsToReload);
+ }
+
+ /** */
+ private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected,
GridDhtPartitionsFullMessage actual) {
+ Map<UUID, PartitionReservationsMap> expHistSuppliers =
U.field(expected.partitionHistorySuppliers(), "map");
+ Map<UUID, PartitionReservationsMap> actHistSuppliers =
U.field(actual.partitionHistorySuppliers(), "map");
+
+ assertEquals(expHistSuppliers.size(), actHistSuppliers.size());
+
+ for (Map.Entry<UUID, PartitionReservationsMap> entry :
expHistSuppliers.entrySet())
+ assertEquals(entry.getValue().reservations(),
actHistSuppliers.get(entry.getKey()).reservations());
+
+ Map<UUID, CachePartitionsToReloadMap> expPartsToReload =
U.field((Object)U.field(expected, "partsToReload"), "map");
+ Map<UUID, CachePartitionsToReloadMap> actPartsToReload =
U.field((Object)U.field(actual, "partsToReload"), "map");
+
+ assertEquals(expPartsToReload.size(), actPartsToReload.size());
+
+ for (Map.Entry<UUID, CachePartitionsToReloadMap> entry :
expPartsToReload.entrySet()) {
+ Map<Integer, PartitionsToReload> expCachePartitions =
U.field(entry.getValue(), "map");
+ Map<Integer, PartitionsToReload> actCachePartitions =
U.field(actPartsToReload.get(entry.getKey()), "map");
+
+ assertEquals(expCachePartitions.size(), actCachePartitions.size());
+
+ for (Map.Entry<Integer, PartitionsToReload> partsEntry :
expCachePartitions.entrySet())
+ assertEquals(partsEntry.getValue().partitions(),
actCachePartitions.get(partsEntry.getKey()).partitions());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 4bb13394dcf..6763eee53fa 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.TransactionsMXBeanImplTest;
import
org.apache.ignite.internal.codegen.IgniteDataTransferObjectProcessorTest;
import org.apache.ignite.internal.codegen.MessageProcessorTest;
import
org.apache.ignite.internal.managers.communication.CacheEntryPredicateAdapterMessageTest;
+import org.apache.ignite.internal.managers.communication.CompressedMessageTest;
import org.apache.ignite.internal.managers.communication.DefaultEnumMapperTest;
import org.apache.ignite.internal.managers.communication.ErrorMessageSelfTest;
import
org.apache.ignite.internal.managers.communication.IndexKeyTypeMessageTest;
@@ -154,6 +155,7 @@ import org.junit.runners.Suite;
DefaultEnumMapperTest.class,
IndexKeyTypeMessageTest.class,
IgniteDataTransferObjectProcessorTest.class,
+ CompressedMessageTest.class
})
public class IgniteBasicTestSuite {
}
diff --git
a/modules/core/src/test/resources/codegen/TestCollectionsCompressedMessage.java
b/modules/core/src/test/resources/codegen/TestCollectionsCompressedMessage.java
new file mode 100644
index 00000000000..0eac72c3549
--- /dev/null
+++
b/modules/core/src/test/resources/codegen/TestCollectionsCompressedMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.List;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestCollectionsCompressedMessage implements Message {
+ @Order(0)
+ List<CompressedMessage> messageList;
+
+ public short directType() {
+ return 0;
+ }
+}
diff --git
a/modules/core/src/test/resources/codegen/TestCompressUnsupportedTypeMessage.java
b/modules/core/src/test/resources/codegen/TestCompressUnsupportedTypeMessage.java
new file mode 100644
index 00000000000..2c5e96bdab4
--- /dev/null
+++
b/modules/core/src/test/resources/codegen/TestCompressUnsupportedTypeMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.List;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestCompressUnsupportedTypeMessage implements Message {
+ @Order(0)
+ @Compress
+ List<String> message;
+
+ public short directType() {
+ return 0;
+ }
+}
\ No newline at end of file
diff --git a/modules/core/src/test/resources/codegen/TestCompressedMessage.java
b/modules/core/src/test/resources/codegen/TestCompressedMessage.java
new file mode 100644
index 00000000000..2d96ca53e70
--- /dev/null
+++ b/modules/core/src/test/resources/codegen/TestCompressedMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestCompressedMessage implements Message {
+ @Order(0)
+ CompressedMessage message;
+
+ public short directType() {
+ return 0;
+ }
+}
diff --git
a/modules/core/src/test/resources/codegen/TestMapCompressedMessage.java
b/modules/core/src/test/resources/codegen/TestMapCompressedMessage.java
new file mode 100644
index 00000000000..c76c5cc301f
--- /dev/null
+++ b/modules/core/src/test/resources/codegen/TestMapCompressedMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.Map;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestMapCompressedMessage implements Message {
+ @Order(0)
+ Map<String, CompressedMessage> stringMessageMap;
+
+ public short directType() {
+ return 0;
+ }
+}