This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 63892a7902f IGNITE-26976 Add compression capability to the 
serialization framework (#12527)
63892a7902f is described below

commit 63892a7902f3abccd350185e1a46be2304b334cc
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Feb 27 17:11:14 2026 +0500

    IGNITE-26976 Add compression capability to the serialization framework 
(#12527)
---
 .../java/org/apache/ignite/internal/Compress.java  |  30 ++
 .../internal/MessageSerializerGenerator.java       |  74 ++++-
 .../internal/direct/DirectMessageReader.java       |  80 +++++-
 .../internal/direct/DirectMessageWriter.java       | 108 +++++++-
 .../direct/stream/DirectByteBufferStream.java      |  37 +++
 .../managers/communication/CompressedMessage.java  | 303 +++++++++++++++++++++
 .../managers/communication/GridIoManager.java      |   2 +-
 .../communication/GridIoMessageFactory.java        |  11 +
 .../cache/GridCachePartitionExchangeManager.java   |  23 +-
 .../dht/preloader/GridDhtPartitionFullMap.java     | 103 ++++++-
 .../dht/preloader/GridDhtPartitionMap.java         |  27 +-
 .../GridDhtPartitionsAbstractMessage.java          |  17 --
 .../preloader/GridDhtPartitionsExchangeFuture.java |  18 +-
 .../preloader/GridDhtPartitionsFullMessage.java    | 124 +++------
 .../preloader/GridDhtPartitionsSingleMessage.java  |  52 +---
 .../internal/util/GridPartitionStateMap.java       |  18 +-
 .../extensions/communication/MessageReader.java    |  38 ++-
 .../extensions/communication/MessageWriter.java    |  36 ++-
 .../tcp/internal/GridNioServerWrapper.java         |   2 +-
 .../internal/codegen/MessageProcessorTest.java     |  34 +++
 .../AbstractMessageSerializationTest.java          |  18 +-
 .../communication/CompressedMessageTest.java       | 147 ++++++++++
 .../ignite/testsuites/IgniteBasicTestSuite.java    |   2 +
 .../codegen/TestCollectionsCompressedMessage.java  |  31 +++
 .../TestCompressUnsupportedTypeMessage.java        |  31 +++
 .../resources/codegen/TestCompressedMessage.java   |  30 ++
 .../codegen/TestMapCompressedMessage.java          |  31 +++
 27 files changed, 1203 insertions(+), 224 deletions(-)

diff --git 
a/modules/codegen/src/main/java/org/apache/ignite/internal/Compress.java 
b/modules/codegen/src/main/java/org/apache/ignite/internal/Compress.java
new file mode 100644
index 00000000000..65dea1348f8
--- /dev/null
+++ b/modules/codegen/src/main/java/org/apache/ignite/internal/Compress.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** This annotation indicates that this field will be compressed during 
serialization. */
+@Retention(RetentionPolicy.CLASS)
+@Target(ElementType.FIELD)
+public @interface Compress {
+    // No-op.
+}
diff --git 
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
 
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index 269407c73ee..51b756c49fb 100644
--- 
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++ 
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -88,6 +88,10 @@ public class MessageSerializerGenerator {
     /** */
     static final String DLFT_ENUM_MAPPER_CLS = 
"org.apache.ignite.plugin.extensions.communication.mappers.DefaultEnumMapper";
 
+    /** */
+    private static final String COMPRESSED_MSG_ERROR = "CompressedMessage 
should not be used explicitly. " +
+        "To compress the required field use the @Compress annotation.";
+
     /** Collection of lines for {@code writeTo} method. */
     private final List<String> write = new ArrayList<>();
 
@@ -320,6 +324,11 @@ public class MessageSerializerGenerator {
 
         TypeMirror type = field.asType();
 
+        boolean compress = field.getAnnotation(Compress.class) != null;
+
+        if (compress)
+            checkTypeForCompress(type);
+
         if (type.getKind().isPrimitive()) {
             String typeName = capitalizeOnlyFirst(type.getKind().name());
 
@@ -371,9 +380,15 @@ public class MessageSerializerGenerator {
 
                 
imports.add("org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType");
 
-                returnFalseIfWriteFailed(write, field, "writer.writeMap", 
getExpr,
-                    "MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(0)),
-                    "MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(1)));
+                List<String> args = new ArrayList<>();
+                args.add(getExpr);
+                args.add("MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(0)));
+                args.add("MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(1)));
+
+                if (compress)
+                    args.add("true"); // the value of the compress argument in 
the MessageWriter#writeMap method
+
+                returnFalseIfWriteFailed(write, field, "writer.writeMap", 
args.toArray(String[]::new));
             }
 
             else if (assignableFrom(type, 
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
@@ -385,8 +400,15 @@ public class MessageSerializerGenerator {
             else if (assignableFrom(type, 
type("org.apache.ignite.internal.util.GridLongList")))
                 returnFalseIfWriteFailed(write, field, 
"writer.writeGridLongList", getExpr);
 
-            else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
-                returnFalseIfWriteFailed(write, field, "writer.writeMessage", 
getExpr);
+            else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
+                if (sameType(type, 
"org.apache.ignite.internal.managers.communication.CompressedMessage"))
+                    throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
+
+                if (compress)
+                    returnFalseIfWriteFailed(write, field, 
"writer.writeMessage", getExpr, "true");
+                else
+                    returnFalseIfWriteFailed(write, field, 
"writer.writeMessage", getExpr);
+            }
 
             else if (assignableFrom(erasedType(type), 
type(Collection.class.getName()))) {
                 List<? extends TypeMirror> typeArgs = 
((DeclaredType)type).getTypeArguments();
@@ -530,6 +552,11 @@ public class MessageSerializerGenerator {
     private void returnFalseIfReadFailed(VariableElement field) throws 
Exception {
         TypeMirror type = field.asType();
 
+        boolean compress = field.getAnnotation(Compress.class) != null;
+
+        if (compress)
+            checkTypeForCompress(type);
+
         if (type.getKind().isPrimitive()) {
             String typeName = capitalizeOnlyFirst(type.getKind().name());
 
@@ -602,9 +629,15 @@ public class MessageSerializerGenerator {
 
                 assert typeArgs.size() == 2;
 
-                returnFalseIfReadFailed(field, "reader.readMap",
-                    "MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(0)),
-                    "MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(1)), "false");
+                List<String> args = new ArrayList<>();
+                args.add("MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(0)));
+                args.add("MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(1)));
+                args.add("false"); // the value of the linked argument in the 
MessageReader#readMap method
+
+                if (compress)
+                    args.add("true"); // the value of the compress argument in 
the MessageReader#readMap method
+
+                returnFalseIfReadFailed(field, "reader.readMap", 
args.toArray(String[]::new));
             }
 
             else if (assignableFrom(type, 
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
@@ -616,8 +649,15 @@ public class MessageSerializerGenerator {
             else if (assignableFrom(type, 
type("org.apache.ignite.internal.util.GridLongList")))
                 returnFalseIfReadFailed(field, "reader.readGridLongList");
 
-            else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
-                returnFalseIfReadFailed(field, "reader.readMessage");
+            else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
+                if (sameType(type, 
"org.apache.ignite.internal.managers.communication.CompressedMessage"))
+                    throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
+
+                if (compress)
+                    returnFalseIfReadFailed(field, "reader.readMessage", 
"true");
+                else
+                    returnFalseIfReadFailed(field, "reader.readMessage");
+            }
 
             else if (assignableFrom(erasedType(type), 
type(Collection.class.getName()))) {
                 List<? extends TypeMirror> typeArgs = 
((DeclaredType)type).getTypeArguments();
@@ -703,6 +743,9 @@ public class MessageSerializerGenerator {
 
             if (primitiveType != null)
                 return primitiveType.getKind().toString();
+
+            if (sameType(type, 
"org.apache.ignite.internal.managers.communication.CompressedMessage"))
+                throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
         }
 
         if (!assignableFrom(type, type(MESSAGE_INTERFACE)))
@@ -954,4 +997,15 @@ public class MessageSerializerGenerator {
 
         return sb.toString();
     }
+
+    /** Checks that the Compress annotation is used only for supported types: 
Map and Message. */
+    private void checkTypeForCompress(TypeMirror type) {
+        if (type.getKind() == TypeKind.DECLARED) {
+            if (assignableFrom(erasedType(type), type(Map.class.getName())) ||
+                assignableFrom(type, type(MESSAGE_INTERFACE)))
+                return;
+        }
+
+        throw new IllegalArgumentException("Compress annotation is used for an 
unsupported type: " + type);
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 24d97da7b2d..3217d7630a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -23,9 +23,11 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Function;
 import org.apache.ignite.internal.direct.state.DirectMessageState;
 import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -49,6 +51,12 @@ public class DirectMessageReader implements MessageReader {
     @GridToStringInclude
     private final DirectMessageState<StateItem> state;
 
+    /** Message factory. */
+    private final MessageFactory msgFactory;
+
+    /** Cache object processor. */
+    private final IgniteCacheObjectProcessor cacheObjProc;
+
     /** Buffer for reading. */
     private ByteBuffer buf;
 
@@ -60,6 +68,9 @@ public class DirectMessageReader implements MessageReader {
      * @param cacheObjProc Cache object processor.
      */
     public DirectMessageReader(final MessageFactory msgFactory, 
IgniteCacheObjectProcessor cacheObjProc) {
+        this.msgFactory = msgFactory;
+        this.cacheObjProc = cacheObjProc;
+
         state = new DirectMessageState<>(StateItem.class, new 
IgniteOutClosure<StateItem>() {
             @Override public StateItem apply() {
                 return new StateItem(msgFactory, cacheObjProc);
@@ -315,12 +326,21 @@ public class DirectMessageReader implements MessageReader 
{
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public <T extends Message> T readMessage() {
+    @Nullable @Override public <T extends Message> T readMessage(boolean 
compress) {
         DirectByteBufferStream stream = state.item().stream;
 
-        T msg = stream.readMessage(this);
+        T msg;
 
-        lastRead = stream.lastFinished();
+        if (compress)
+            msg = readCompressedMessageAndDeserialize(
+                stream,
+                tmpReader -> 
tmpReader.state.item().stream.readMessage(tmpReader)
+            );
+        else {
+            msg = stream.readMessage(this);
+
+            lastRead = stream.lastFinished();
+        }
 
         return msg;
     }
@@ -393,12 +413,21 @@ public class DirectMessageReader implements MessageReader 
{
 
     /** {@inheritDoc} */
     @Override public <M extends Map<?, ?>> M readMap(MessageCollectionItemType 
keyType,
-        MessageCollectionItemType valType, boolean linked) {
+        MessageCollectionItemType valType, boolean linked, boolean compress) {
         DirectByteBufferStream stream = state.item().stream;
 
-        M map = stream.readMap(keyType, valType, linked, this);
+        M map;
 
-        lastRead = stream.lastFinished();
+        if (compress)
+            map = readCompressedMessageAndDeserialize(
+                stream,
+                tmpReader -> tmpReader.state.item().stream.readMap(keyType, 
valType, linked, tmpReader)
+            );
+        else {
+            map = stream.readMap(keyType, valType, linked, this);
+
+            lastRead = stream.lastFinished();
+        }
 
         return map;
     }
@@ -418,6 +447,11 @@ public class DirectMessageReader implements MessageReader {
         state.item().state++;
     }
 
+    /** {@inheritDoc} */
+    @Override public void decrementState() {
+        state.item().state--;
+    }
+
     /** {@inheritDoc} */
     @Override public void beforeInnerMessageRead() {
         state.forward();
@@ -440,6 +474,40 @@ public class DirectMessageReader implements MessageReader {
         return S.toString(DirectMessageReader.class, this);
     }
 
+    /** @return Deserialized object. */
+    private <T> T readCompressedMessageAndDeserialize(DirectByteBufferStream 
stream, Function<DirectMessageReader, T> fun) {
+        Message msg = stream.readMessage(this);
+
+        lastRead = stream.lastFinished();
+
+        if (!lastRead || msg == null)
+            return null;
+
+        assert msg instanceof CompressedMessage : msg;
+
+        CompressedMessage msg0 = (CompressedMessage)msg;
+
+        if (msg0.dataSize() == 0)
+            return null;
+
+        byte[] uncompressed = msg0.uncompressed();
+
+        ByteBuffer tmpBuf = ByteBuffer.allocateDirect(uncompressed.length);
+
+        tmpBuf.put(uncompressed);
+        tmpBuf.flip();
+
+        DirectMessageReader tmpReader = new DirectMessageReader(msgFactory, 
cacheObjProc);
+
+        tmpReader.setBuffer(tmpBuf);
+
+        T res = fun.apply(tmpReader);
+
+        lastRead = tmpReader.state.item().stream.lastFinished();
+
+        return res;
+    }
+
     /**
      */
     private static class StateItem implements DirectMessageStateItem {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index 1da76aa14cf..2cb363f1bd4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -23,9 +23,11 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.direct.state.DirectMessageState;
 import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -40,19 +42,41 @@ import 
org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.configuration.IgniteConfiguration.DFLT_NETWORK_COMPRESSION;
+
 /**
  * Message writer implementation.
  */
 public class DirectMessageWriter implements MessageWriter {
+    /** Temporary buffer capacity.  */
+    private static final int TMP_BUF_CAPACITY = 10 * 1024;
+
     /** State. */
     @GridToStringInclude
     private final DirectMessageState<StateItem> state;
 
+    /** Message factory. */
+    private final MessageFactory msgFactory;
+
+    /** Compression level. Used only for {@link CompressedMessage}. */
+    private final int compressionLvl;
+
     /** Buffer for writing. */
     private ByteBuffer buf;
 
-    /** */
+    /** @param msgFactory Message factory. */
     public DirectMessageWriter(final MessageFactory msgFactory) {
+        this(msgFactory, DFLT_NETWORK_COMPRESSION);
+    }
+
+    /**
+     * @param msgFactory Message factory.
+     * @param compressionLvl Compression level.
+     */
+    public DirectMessageWriter(final MessageFactory msgFactory, final int 
compressionLvl) {
+        this.msgFactory = msgFactory;
+        this.compressionLvl = compressionLvl;
+
         state = new DirectMessageState<>(StateItem.class, new 
IgniteOutClosure<StateItem>() {
             @Override public StateItem apply() {
                 return new StateItem(msgFactory);
@@ -293,10 +317,17 @@ public class DirectMessageWriter implements MessageWriter 
{
     }
 
     /** {@inheritDoc} */
-    @Override public boolean writeMessage(@Nullable Message msg) {
+    @Override public boolean writeMessage(@Nullable Message msg, boolean 
compress) {
         DirectByteBufferStream stream = state.item().stream;
 
-        stream.writeMessage(msg, this);
+        if (compress)
+            writeCompressedMessage(
+                tmpWriter -> tmpWriter.state.item().stream.writeMessage(msg, 
tmpWriter),
+                msg == null,
+                stream
+            );
+        else
+            stream.writeMessage(msg, this);
 
         return stream.lastFinished();
     }
@@ -353,10 +384,17 @@ public class DirectMessageWriter implements MessageWriter 
{
 
     /** {@inheritDoc} */
     @Override public <K, V> boolean writeMap(Map<K, V> map, 
MessageCollectionItemType keyType,
-        MessageCollectionItemType valType) {
+        MessageCollectionItemType valType, boolean compress) {
         DirectByteBufferStream stream = state.item().stream;
 
-        stream.writeMap(map, keyType, valType, this);
+        if (compress)
+            writeCompressedMessage(
+                tmpWriter -> tmpWriter.state.item().stream.writeMap(map, 
keyType, valType, tmpWriter),
+                map == null,
+                stream
+            );
+        else
+            stream.writeMap(map, keyType, valType, this);
 
         return stream.lastFinished();
     }
@@ -381,6 +419,11 @@ public class DirectMessageWriter implements MessageWriter {
         state.item().state++;
     }
 
+    /** {@inheritDoc} */
+    @Override public void decrementState() {
+        state.item().state--;
+    }
+
     /** {@inheritDoc} */
     @Override public void beforeInnerMessageWrite() {
         state.forward();
@@ -403,6 +446,61 @@ public class DirectMessageWriter implements MessageWriter {
         return S.toString(DirectMessageWriter.class, this);
     }
 
+    /**
+     * @param consumer Consumer.
+     * @param isNull {@code True} if message is null.
+     * @param stream Byte buffer stream.
+     */
+    private void writeCompressedMessage(Consumer<DirectMessageWriter> 
consumer, boolean isNull, DirectByteBufferStream stream) {
+        if (isNull) {
+            stream.writeShort(Short.MIN_VALUE);
+
+            return;
+        }
+
+        if (!stream.serializeFinished()) {
+            ByteBuffer tmpBuf = ByteBuffer.allocateDirect(TMP_BUF_CAPACITY);
+
+            DirectMessageWriter tmpWriter = new 
DirectMessageWriter(msgFactory, compressionLvl);
+
+            tmpWriter.setBuffer(tmpBuf);
+
+            boolean finished;
+
+            do {
+                if (tmpBuf.remaining() <= tmpBuf.capacity() / 10) {
+                    byte[] bytes = new byte[tmpBuf.position()];
+
+                    tmpBuf.flip();
+                    tmpBuf.get(bytes);
+
+                    tmpBuf = ByteBuffer.allocateDirect(tmpBuf.capacity() * 2);
+
+                    tmpBuf.put(bytes);
+
+                    tmpWriter.setBuffer(tmpBuf);
+                }
+
+                consumer.accept(tmpWriter);
+
+                finished = tmpWriter.state.item().stream.lastFinished();
+            }
+            while (!finished);
+
+            tmpBuf.flip();
+
+            stream.compressedMessage(new CompressedMessage(tmpBuf, 
compressionLvl));
+            stream.serializeFinished(true);
+        }
+
+        stream.writeMessage(stream.compressedMessage(), this);
+
+        if (stream.lastFinished()) {
+            stream.compressedMessage(null);
+            stream.serializeFinished(false);
+        }
+    }
+
     /**
      */
     private static class StateItem implements DirectMessageStateItem {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
index 5c526bf6fd3..ca3ecfc7fbc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -339,6 +340,12 @@ public class DirectByteBufferStream {
     /** */
     private byte cacheObjType;
 
+    /** */
+    private CompressedMessage compressedMsg;
+
+    /** */
+    private boolean serializeFinished;
+
     /**
      * Constructror for stream used for writing messages.
      *
@@ -390,6 +397,36 @@ public class DirectByteBufferStream {
         return lastFinished;
     }
 
+    /**
+     * @return Compressed message.
+     */
+    public CompressedMessage compressedMessage() {
+        assert compressedMsg != null;
+
+        return compressedMsg;
+    }
+
+    /**
+     * @param compressedMsg Compressed message.
+     */
+    public void compressedMessage(CompressedMessage compressedMsg) {
+        this.compressedMsg = compressedMsg;
+    }
+
+    /**
+     * @return Whether last object was fully serialized.
+     */
+    public boolean serializeFinished() {
+        return serializeFinished;
+    }
+
+    /**
+     * @param serializeFinished {@code True} if last object was fully 
serialized.
+     */
+    public void serializeFinished(boolean serializeFinished) {
+        this.serializeFinished = serializeFinished;
+    }
+
     /**
      * @param val Value.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
new file mode 100644
index 00000000000..95565217f86
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Internal message used when transmitting fields annotated with @Compress 
over the network.
+ * <p>
+ * WARNING: CompressedMessage is not intended for explicit use in messages.
+ */
+public class CompressedMessage implements Message {
+    /** Type code. */
+    public static final short TYPE_CODE = -101;
+
+    /** Chunk size. */
+    static final int CHUNK_SIZE = 10 * 1024;
+
+    /** Reader buffer capacity. */
+    private static final int BUFFER_CAPACITY = 10 * CHUNK_SIZE;
+
+    /** Temporary buffer for compressed data received over the network. */
+    private ByteBuffer tmpBuf;
+
+    /** Raw data size. */
+    private int dataSize;
+
+    /** Chunked byte reader. */
+    private ChunkedByteReader chunkedReader;
+
+    /** Chunk. */
+    private byte[] chunk;
+
+    /** Flag indicating whether this is the last chunk. */
+    private boolean finalChunk;
+
+    /** Compression level. */
+    private int compressionLvl;
+
+    /** Constructor. */
+    public CompressedMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param buf Source buffer with serialized data.
+     * @param compressionLvl Compression level.
+     */
+    public CompressedMessage(ByteBuffer buf, int compressionLvl) {
+        dataSize = buf.remaining();
+        this.compressionLvl = compressionLvl;
+
+        if (dataSize > 0)
+            chunkedReader = new ChunkedByteReader(compress(buf));
+    }
+
+    /** @return Raw data size. */
+    public int dataSize() {
+        return dataSize;
+    }
+
+    /** @return Uncompressed data. */
+    public byte[] uncompressed() {
+        assert finalChunk;
+
+        return uncompress();
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        while (true) {
+            if (chunk == null && chunkedReader != null) {
+                chunk = chunkedReader.nextChunk();
+
+                finalChunk = (chunk == null);
+            }
+
+            switch (writer.state()) {
+                case 0:
+                    if (!writer.writeInt(dataSize))
+                        return false;
+
+                    writer.incrementState();
+
+                    if (dataSize == 0)
+                        return true;
+
+                case 1:
+                    if (!writer.writeBoolean(finalChunk))
+                        return false;
+
+                    writer.incrementState();
+
+                    if (finalChunk)
+                        return true;
+
+                case 2:
+                    if (!writer.writeByteArray(chunk))
+                        return false;
+
+                    chunk = null;
+
+                    writer.decrementState();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (tmpBuf == null)
+            tmpBuf = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
+
+        assert chunk == null : chunk;
+
+        while (true) {
+            switch (reader.state()) {
+                case 0:
+                    dataSize = reader.readInt();
+
+                    if (!reader.isLastRead())
+                        return false;
+
+                    if (dataSize == 0)
+                        return true;
+
+                    reader.incrementState();
+
+                case 1:
+                    finalChunk = reader.readBoolean();
+
+                    if (!reader.isLastRead())
+                        return false;
+
+                    if (finalChunk)
+                        return true;
+
+                    reader.incrementState();
+
+                case 2:
+                    chunk = reader.readByteArray();
+
+                    if (!reader.isLastRead())
+                        return false;
+
+                    if (chunk != null) {
+                        if (tmpBuf.remaining() <= CHUNK_SIZE) {
+                            ByteBuffer newTmpBuf = 
ByteBuffer.allocateDirect(tmpBuf.capacity() * 2);
+
+                            tmpBuf.flip();
+
+                            newTmpBuf.put(tmpBuf);
+
+                            tmpBuf = newTmpBuf;
+                        }
+
+                        tmpBuf.put(chunk);
+
+                        reader.decrementState();
+
+                        chunk = null;
+                    }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
+
+    /**
+     * @param buf Buffer.
+     * @return Compressed data.
+     */
+    private byte[] compress(ByteBuffer buf) {
+        byte[] data = new byte[dataSize];
+
+        buf.get(data);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(data.length);
+        Deflater deflater = new Deflater(compressionLvl, true);
+
+        try (DeflaterOutputStream dos = new DeflaterOutputStream(baos, 
deflater)) {
+            dos.write(data);
+            dos.finish();
+        }
+        catch (IOException ex) {
+            throw new IgniteException(ex);
+        }
+        finally {
+            deflater.end();
+        }
+
+        return baos.toByteArray();
+    }
+
+    /** @return Uncompressed data. */
+    private byte[] uncompress() {
+        if (tmpBuf == null)
+            return null;
+
+        byte[] uncompressedData;
+
+        Inflater inflater = new Inflater(true);
+
+        byte[] bytes = new byte[tmpBuf.position()];
+
+        tmpBuf.flip();
+        tmpBuf.get(bytes);
+
+        try (InflaterInputStream iis = new InflaterInputStream(new 
ByteArrayInputStream(bytes), inflater)) {
+            uncompressedData = iis.readAllBytes();
+        }
+        catch (IOException ex) {
+            throw new IgniteException(ex);
+        }
+        finally {
+            inflater.end();
+        }
+
+        assert uncompressedData != null;
+        assert uncompressedData.length == dataSize : "Expected=" + dataSize + 
", actual=" + uncompressedData.length;
+
+        tmpBuf = null;
+
+        return uncompressedData;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(CompressedMessage.class, this);
+    }
+
+    /** Byte reader returning data chunks of predefined size. */
+    private static class ChunkedByteReader {
+        /** Input data. */
+        private final byte[] inputData;
+
+        /** Current position. */
+        private int pos;
+
+        /** Constructor. */
+        ChunkedByteReader(byte[] inputData) {
+            this.inputData = inputData;
+        }
+
+        /** @return Next chunk of bytes or null. */
+        byte[] nextChunk() {
+            if (pos >= inputData.length)
+                return null;
+
+            int curChunkSize = Math.min(inputData.length - pos, CHUNK_SIZE);
+
+            byte[] chunk = new byte[curChunkSize];
+
+            System.arraycopy(inputData, pos, chunk, 0, curChunkSize);
+
+            pos += curChunkSize;
+
+            return chunk;
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index b3032940866..07785b6913f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -433,7 +433,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
         else {
             formatter = new MessageFormatter() {
                 @Override public MessageWriter writer(MessageFactory 
msgFactory) {
-                    return new DirectMessageWriter(msgFactory);
+                    return new DirectMessageWriter(msgFactory, 
ctx.config().getNetworkCompressionLevel());
                 }
 
                 @Override public MessageReader reader(MessageFactory 
msgFactory) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7ccc0ef4a76..8756e664c1e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -178,6 +178,10 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessageSerializer;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeIdSerializer;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMapSerializer;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMapSerializer;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageSerializer;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
@@ -317,6 +321,8 @@ import 
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeployment
 import 
org.apache.ignite.internal.processors.service.ServiceSingleNodeDeploymentResultSerializer;
 import org.apache.ignite.internal.util.GridByteArrayList;
 import org.apache.ignite.internal.util.GridByteArrayListSerializer;
+import org.apache.ignite.internal.util.GridPartitionStateMap;
+import org.apache.ignite.internal.util.GridPartitionStateMapSerializer;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.UUIDCollectionMessageSerializer;
 import org.apache.ignite.internal.util.distributed.SingleNodeMessage;
@@ -343,6 +349,8 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
     /** {@inheritDoc} */
     @Override public void registerAll(MessageFactory factory) {
         // -54 is reserved for SQL.
+        // We don't use the code‑generated serializer for CompressedMessage - 
serialization is highly customized.
+        factory.register(CompressedMessage.TYPE_CODE, CompressedMessage::new);
         factory.register((short)-100, ErrorMessage::new, new 
ErrorMessageSerializer());
         factory.register((short)-65, TxInfo::new, new TxInfoSerializer());
         factory.register((short)-64, TxEntriesInfo::new, new 
TxEntriesInfoSerializer());
@@ -530,6 +538,9 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
             new IgniteDhtPartitionsToReloadMapSerializer());
         factory.register(IntLongMap.TYPE_CODE, IntLongMap::new, new 
IntLongMapSerializer());
         factory.register(IndexKeyTypeMessage.TYPE_CODE, 
IndexKeyTypeMessage::new, new IndexKeyTypeMessageSerializer());
+        factory.register(GridPartitionStateMap.TYPE_CODE, 
GridPartitionStateMap::new, new GridPartitionStateMapSerializer());
+        factory.register(GridDhtPartitionMap.TYPE_CODE, 
GridDhtPartitionMap::new, new GridDhtPartitionMapSerializer());
+        factory.register(GridDhtPartitionFullMap.TYPE_CODE, 
GridDhtPartitionFullMap::new, new GridDhtPartitionFullMapSerializer());
 
         // [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
         // [120..123] - DR
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5918023afff..71a8f655737 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1288,7 +1288,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     ) {
         long time = System.currentTimeMillis();
 
-        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, 
null, null, null, null, grps);
+        GridDhtPartitionsFullMessage m = createPartitionsFullMessage(null, 
null, null, null, grps);
 
         m.topologyVersion(msgTopVer);
 
@@ -1342,8 +1342,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /**
      * Creates partitions full message for all cache groups.
      *
-     * @param compress {@code True} if possible to compress message (properly 
work only if prepareMarshall/
-     * finishUnmarshall methods are called).
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
      * @param partHistSuppliers Partition history suppliers map.
@@ -1351,7 +1349,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return Message.
      */
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
-        boolean compress,
         @Nullable final GridDhtPartitionExchangeId exchId,
         @Nullable GridCacheVersion lastVer,
         @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -1359,14 +1356,12 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     ) {
         Collection<CacheGroupContext> grps = cctx.cache().cacheGroups();
 
-        return createPartitionsFullMessage(compress, exchId, lastVer, 
partHistSuppliers, partsToReload, grps);
+        return createPartitionsFullMessage(exchId, lastVer, partHistSuppliers, 
partsToReload, grps);
     }
 
     /**
      * Creates partitions full message for selected cache groups.
      *
-     * @param compress {@code True} if possible to compress message (properly 
work only if prepareMarshall/
-     *     finishUnmarshall methods are called).
      * @param exchId Non-null exchange ID if message is created for exchange.
      * @param lastVer Last version.
      * @param partHistSuppliers Partition history suppliers map.
@@ -1375,7 +1370,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * @return Message.
      */
     public GridDhtPartitionsFullMessage createPartitionsFullMessage(
-        boolean compress,
         @Nullable final GridDhtPartitionExchangeId exchId,
         @Nullable GridCacheVersion lastVer,
         @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
@@ -1387,8 +1381,6 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         final GridDhtPartitionsFullMessage m =
             new GridDhtPartitionsFullMessage(exchId, lastVer, ver, 
partHistSuppliers, partsToReload);
 
-        m.compressed(compress);
-
         final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new 
HashMap<>();
 
         Map<Integer, Map<Integer, Long>> partsSizes = new HashMap<>();
@@ -1406,7 +1398,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
 
             if (locMap != null)
-                addFullPartitionsMap(m, dupData, compress, grp.groupId(), 
locMap, affCache.similarAffinityKey());
+                addFullPartitionsMap(m, dupData, grp.groupId(), locMap, 
affCache.similarAffinityKey());
 
             Map<Integer, Long> partSizesMap = grp.topology().globalPartSizes();
 
@@ -1426,7 +1418,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
             GridDhtPartitionFullMap map = top.partitionMap(true);
 
             if (map != null)
-                addFullPartitionsMap(m, dupData, compress, top.groupId(), map, 
top.similarAffinityKey());
+                addFullPartitionsMap(m, dupData, top.groupId(), map, 
top.similarAffinityKey());
 
             if (exchId != null) {
                 m.addPartitionUpdateCounters(top.groupId(), 
top.fullUpdateCounters());
@@ -1449,14 +1441,12 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /**
      * @param m Message.
      * @param dupData Duplicated data map.
-     * @param compress {@code True} if need check for duplicated partition 
state data.
      * @param grpId Cache group ID.
      * @param map Map to add.
      * @param affKey Cache affinity key.
      */
     private void addFullPartitionsMap(GridDhtPartitionsFullMessage m,
         Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData,
-        boolean compress,
         Integer grpId,
         GridDhtPartitionFullMap map,
         Object affKey) {
@@ -1464,7 +1454,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         Integer dupDataCache = null;
 
-        if (compress && affKey != null && !m.containsGroup(grpId)) {
+        if (affKey != null && !m.containsGroup(grpId)) {
             T2<Integer, GridDhtPartitionFullMap> state0 = dupData.get(affKey);
 
             if (state0 != null && state0.get2().partitionStateEquals(map)) {
@@ -1556,8 +1546,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     ) {
         GridDhtPartitionsSingleMessage m = new 
GridDhtPartitionsSingleMessage(exchangeId,
             clientOnlyExchange,
-            cctx.versions().last(),
-            true);
+            cctx.versions().last());
 
         Map<Object, T2<Integer, GridPartitionStateMap>> dupData = new 
HashMap<>();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
index 068d5a3f5bb..44e710d13ae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionFullMap.java
@@ -21,30 +21,45 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.AbstractMap;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.NotNull;
 
 /**
  * Full partition map from all nodes.
  */
 public class GridDhtPartitionFullMap
-    extends HashMap<UUID, GridDhtPartitionMap> implements 
Comparable<GridDhtPartitionFullMap>, Externalizable {
+    extends AbstractMap<UUID, GridDhtPartitionMap> implements 
Comparable<GridDhtPartitionFullMap>, Externalizable, Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 519;
+
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Node ID. */
-    private UUID nodeId;
+    @Order(0)
+    UUID nodeId;
 
     /** Node order. */
-    private long nodeOrder;
+    @Order(1)
+    long nodeOrder;
 
     /** Update sequence number. */
-    private long updateSeq;
+    @Order(2)
+    long updateSeq;
+
+    /** Partition map. */
+    @Order(3)
+    Map<UUID, GridDhtPartitionMap> map;
 
     /**
      * @param nodeId Node ID.
@@ -59,6 +74,7 @@ public class GridDhtPartitionFullMap
         this.nodeId = nodeId;
         this.nodeOrder = nodeOrder;
         this.updateSeq = updateSeq;
+        map = new HashMap<>();
     }
 
     /**
@@ -77,6 +93,7 @@ public class GridDhtPartitionFullMap
         this.nodeId = nodeId;
         this.nodeOrder = nodeOrder;
         this.updateSeq = updateSeq;
+        map = new HashMap<>();
 
         for (Map.Entry<UUID, GridDhtPartitionMap> e : m.entrySet()) {
             GridDhtPartitionMap part = e.getValue();
@@ -96,7 +113,7 @@ public class GridDhtPartitionFullMap
      * @param updateSeq Update sequence.
      */
     public GridDhtPartitionFullMap(GridDhtPartitionFullMap m, long updateSeq) {
-        super(m);
+        map = new HashMap<>(m);
 
         nodeId = m.nodeId;
         nodeOrder = m.nodeOrder;
@@ -107,7 +124,7 @@ public class GridDhtPartitionFullMap
      * Empty constructor required for {@link Externalizable}.
      */
     public GridDhtPartitionFullMap() {
-        // No-op.
+        map = new HashMap<>();
     }
 
     /**
@@ -185,7 +202,7 @@ public class GridDhtPartitionFullMap
         out.writeLong(nodeOrder);
         out.writeLong(updateSeq);
 
-        U.writeMap(out, this);
+        U.writeMap(out, map);
     }
 
     /** {@inheritDoc} */
@@ -195,7 +212,12 @@ public class GridDhtPartitionFullMap
         nodeOrder = in.readLong();
         updateSeq = in.readLong();
 
-        putAll(U.<UUID, GridDhtPartitionMap>readMap(in));
+        map = new HashMap<>();
+
+        Map<UUID, GridDhtPartitionMap> map0 = U.readMap(in);
+
+        if (map0 != null)
+            map.putAll(map0);
     }
 
     /** {@inheritDoc} */
@@ -260,4 +282,69 @@ public class GridDhtPartitionFullMap
 
         return Long.compare(updateSeq, o.updateSeq);
     }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionMap put(UUID key, GridDhtPartitionMap 
val) {
+        return map.put(key, val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void putAll(Map<? extends UUID, ? extends 
GridDhtPartitionMap> m) {
+        map.putAll(m);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionMap get(Object key) {
+        return map.get(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtPartitionMap remove(Object key) {
+        return map.remove(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return map.size();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isEmpty() {
+        return map.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean containsValue(Object val) {
+        return map.containsValue(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean containsKey(Object key) {
+        return map.containsKey(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clear() {
+        map.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Set<UUID> keySet() {
+        return map.keySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Collection<GridDhtPartitionMap> values() {
+        return map.values();
+    }
+
+    /** {@inheritDoc} */
+    @Override public @NotNull Set<Entry<UUID, GridDhtPartitionMap>> entrySet() 
{
+        return map.entrySet();
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
index 3cbf75767e7..b343d55d11b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionMap.java
@@ -26,31 +26,40 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.util.GridPartitionStateMap;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
 
 /**
  * Partition map from single node.
  */
-public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, 
Externalizable {
+public class GridDhtPartitionMap implements Comparable<GridDhtPartitionMap>, 
Externalizable, Message {
+    /** Type code. */
+    public static final short TYPE_CODE = 518;
+
     /** */
     private static final long serialVersionUID = 0L;
 
     /** Node ID. */
+    @Order(0)
     protected UUID nodeId;
 
     /** Update sequence number. */
+    @Order(1)
     protected long updateSeq;
 
     /** Topology version. */
+    @Order(2)
     protected AffinityTopologyVersion top;
 
     /** */
+    @Order(value = 3, method = "map")
     protected GridPartitionStateMap map;
 
     /** */
@@ -197,6 +206,17 @@ public class GridDhtPartitionMap implements 
Comparable<GridDhtPartitionMap>, Ext
         return map;
     }
 
+    /**
+     * @param map Partitions state map.
+     */
+    public void map(GridPartitionStateMap map) {
+        this.map = new GridPartitionStateMap();
+
+        if (map != null)
+            for (Map.Entry<Integer, GridDhtPartitionState> entry : 
map.entrySet())
+                put(entry.getKey(), entry.getValue());
+    }
+
     /**
      * @return Node ID.
      */
@@ -337,4 +357,9 @@ public class GridDhtPartitionMap implements 
Comparable<GridDhtPartitionMap>, Ext
     @Override public String toString() {
         return S.toString(GridDhtPartitionMap.class, this, "top", top, 
"updateSeq", updateSeq, "size", size());
     }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
index c03ef06bac7..6170e30e20a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java
@@ -28,9 +28,6 @@ import org.jetbrains.annotations.Nullable;
  * Request for single partition info.
  */
 public abstract class GridDhtPartitionsAbstractMessage extends 
GridCacheMessage {
-    /** */
-    private static final byte COMPRESSED_FLAG_MASK = 0x01;
-
     /** */
     private static final byte RESTORE_STATE_FLAG_MASK = 0x02;
 
@@ -107,20 +104,6 @@ public abstract class GridDhtPartitionsAbstractMessage 
extends GridCacheMessage
         return lastVer;
     }
 
-    /**
-     * @return {@code True} if message data is compressed.
-     */
-    public final boolean compressed() {
-        return (flags & COMPRESSED_FLAG_MASK) != 0;
-    }
-
-    /**
-     * @param compressed {@code True} if message data is compressed.
-     */
-    public final void compressed(boolean compressed) {
-        flags = compressed ? (byte)(flags | COMPRESSED_FLAG_MASK) : 
(byte)(flags & ~COMPRESSED_FLAG_MASK);
-    }
-
     /**
      * @param restoreState Restore exchange state flag.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 3456d33cbab..abf0d704c73 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -2117,8 +2117,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() 
&& exchangeLocE != null)) {
             msg = new GridDhtPartitionsSingleMessage(exchangeId(),
                 cctx.kernalContext().clientNode(),
-                cctx.versions().last(),
-                true);
+                cctx.versions().last());
         }
         else {
             msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
@@ -2165,14 +2164,12 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @param compress Message compress flag.
      * @return Message.
      */
-    private GridDhtPartitionsFullMessage createPartitionsMessage(boolean 
compress) {
+    private GridDhtPartitionsFullMessage createPartitionsMessage() {
         GridCacheVersion last = lastVer.get();
 
         GridDhtPartitionsFullMessage m = 
cctx.exchange().createPartitionsFullMessage(
-            compress,
             exchangeId(),
             last != null ? last : cctx.versions().last(),
             partHistSuppliers,
@@ -2958,7 +2955,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
         onDone(null, reconnectEx);
 
-        GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true);
+        GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage();
 
         fullMsg.setErrorsMap(exchangeGlobalExceptions);
 
@@ -3119,7 +3116,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                         return;
                     }
 
-                    GridDhtPartitionsFullMessage msg = 
createPartitionsMessage(true);
+                    GridDhtPartitionsFullMessage msg = 
createPartitionsMessage();
 
                     msg.rebalanced(rebalanced());
 
@@ -3289,7 +3286,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             Map<Integer, Map<Integer, List<UUID>>> assignmentChange = 
fut.get();
 
-            GridDhtPartitionsFullMessage m = createPartitionsMessage(false);
+            GridDhtPartitionsFullMessage m = createPartitionsMessage();
 
             CacheAffinityChangeMessage msg = new 
CacheAffinityChangeMessage(exchId, m, assignmentChange);
 
@@ -3858,7 +3855,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
 
             cctx.versions().onExchange(lastVer.get().order());
 
-            GridDhtPartitionsFullMessage msg = createPartitionsMessage(true);
+            GridDhtPartitionsFullMessage msg = createPartitionsMessage();
 
             if (!cctx.affinity().rebalanceRequired() && !deactivateCluster())
                 msg.rebalanced(true);
@@ -4420,8 +4417,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                 if (dynamicCacheStartExchange() && exchangeLocE != null) {
                     res = new 
GridDhtPartitionsSingleMessage(msg.restoreExchangeId(),
                         cctx.kernalContext().clientNode(),
-                        cctx.versions().last(),
-                        true);
+                        cctx.versions().last());
 
                     res.setError(exchangeLocE);
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0a6da4ecc88..872629ddf19 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -17,11 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -29,21 +27,19 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.Compress;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.communication.ErrorMessage;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -60,13 +56,10 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     @GridToStringInclude
     private Map<Integer, GridDhtPartitionFullMap> parts;
 
-    /**
-     * Serialized local partitions.
-     * <p>
-     * TODO Remove this field after completing task IGNITE-26976.
-     */
+    /** Partitions without duplicated data. */
     @Order(6)
-    byte[] partsBytes;
+    @Compress
+    Map<Integer, GridDhtPartitionFullMap> locParts;
 
     /** */
     @Order(7)
@@ -74,16 +67,19 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
     /** Partitions update counters. */
     @Order(8)
+    @Compress
     @GridToStringInclude
     IgniteDhtPartitionCountersMap partCntrs;
 
     /** Partitions history suppliers. */
     @Order(9)
+    @Compress
     @GridToStringInclude
     IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
 
     /** Partitions that must be cleared and re-loaded. */
     @Order(10)
+    @Compress
     @GridToStringInclude
     IgniteDhtPartitionsToReloadMap partsToReload;
 
@@ -104,8 +100,9 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
      * All logic resides within getter and setter.
      */
     @Order(value = 13, method = "errorMessages")
+    @Compress
     @SuppressWarnings("unused")
-    Map<UUID, ErrorMessage> errMsgs;
+    private Map<UUID, ErrorMessage> errMsgs;
 
     /** */
     @Order(14)
@@ -162,25 +159,9 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
 
         GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg;
 
-        if (parts != null) {
-            cp.parts = new HashMap<>(parts.size());
-
-            for (Map.Entry<Integer, GridDhtPartitionFullMap> e : 
parts.entrySet()) {
-                GridDhtPartitionFullMap val = e.getValue();
-
-                cp.parts.put(e.getKey(), new GridDhtPartitionFullMap(
-                    val.nodeId(),
-                    val.nodeOrder(),
-                    val.updateSequence(),
-                    val,
-                    false));
-            }
-        }
-        else
-            cp.parts = null;
-
+        cp.parts = parts != null ? copyPartitionsMap(parts) : null;
         cp.dupPartsData = dupPartsData;
-        cp.partsBytes = partsBytes;
+        cp.locParts = locParts;
         cp.partCntrs = partCntrs;
         cp.partHistSuppliers = partHistSuppliers;
         cp.partsToReload = partsToReload;
@@ -280,7 +261,6 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
             parts.put(grpId, fullMap);
 
             if (dupDataCache != null) {
-                assert compressed();
                 assert parts.containsKey(dupDataCache);
 
                 if (dupPartsData == null)
@@ -419,37 +399,8 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        boolean marshal = !F.isEmpty(parts) && partsBytes == null;
-
-        if (marshal) {
-            // Reserve at least 2 threads for system operations.
-            int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), 
GridIoPolicy.SYSTEM_POOL, 2);
-
-            Collection<Object> objectsToMarshall = new ArrayList<>();
-
-            if (!F.isEmpty(parts) && partsBytes == null)
-                objectsToMarshall.add(parts);
-
-            Collection<byte[]> marshalled = U.doInParallel(
-                parallelismLvl,
-                ctx.kernalContext().pools().getSystemExecutorService(),
-                objectsToMarshall,
-                new IgniteThrowableFunction<Object, byte[]>() {
-                    @Override public byte[] apply(Object payload) throws 
IgniteCheckedException {
-                        byte[] marshalled = U.marshal(ctx, payload);
-
-                        if (compressed())
-                            marshalled = U.zip(marshalled, 
ctx.gridConfig().getNetworkCompressionLevel());
-
-                        return marshalled;
-                    }
-                });
-
-            Iterator<byte[]> iter = marshalled.iterator();
-
-            if (!F.isEmpty(parts) && partsBytes == null)
-                partsBytes = iter.next();
-        }
+        if (!F.isEmpty(parts) && locParts == null)
+            locParts = copyPartitionsMap(parts);
     }
 
     /**
@@ -470,37 +421,10 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
     @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        ClassLoader clsLdr = U.resolveClassLoader(ldr, ctx.gridConfig());
-
-        Collection<byte[]> objectsToUnmarshall = new ArrayList<>();
-
-        // Reserve at least 2 threads for system operations.
-        int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), 
GridIoPolicy.SYSTEM_POOL, 2);
-
-        if (partsBytes != null && parts == null)
-            objectsToUnmarshall.add(partsBytes);
-
-        Collection<Object> unmarshalled = U.doInParallel(
-            parallelismLvl,
-            ctx.kernalContext().pools().getSystemExecutorService(),
-            objectsToUnmarshall,
-            new IgniteThrowableFunction<byte[], Object>() {
-                @Override public Object apply(byte[] binary) throws 
IgniteCheckedException {
-                    return compressed()
-                        ? U.unmarshalZip(ctx.marshaller(), binary, clsLdr)
-                        : U.unmarshal(ctx, binary, clsLdr);
-                }
-            }
-        );
-
-        Iterator<Object> iter = unmarshalled.iterator();
-
-        if (partsBytes != null && parts == null) {
-            parts = (Map<Integer, GridDhtPartitionFullMap>)iter.next();
+        if (locParts != null && parts == null) {
+            parts = copyPartitionsMap(locParts);
 
             if (dupPartsData != null) {
-                assert parts != null;
-
                 for (Map.Entry<Integer, Integer> e : dupPartsData.entrySet()) {
                     GridDhtPartitionFullMap map1 = parts.get(e.getKey());
                     GridDhtPartitionFullMap map2 = parts.get(e.getValue());
@@ -588,7 +512,23 @@ public class GridDhtPartitionsFullMessage extends 
GridDhtPartitionsAbstractMessa
      * Cleans up resources to avoid excessive memory usage.
      */
     public void cleanUp() {
-        partsBytes = null;
+        locParts = null;
         partCntrs = null;
     }
+
+    /** */
+    private Map<Integer, GridDhtPartitionFullMap> 
copyPartitionsMap(Map<Integer, GridDhtPartitionFullMap> src) {
+        Map<Integer, GridDhtPartitionFullMap> map = new HashMap<>(src.size());
+
+        for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : 
src.entrySet()) {
+            GridDhtPartitionFullMap val = entry.getValue();
+
+            map.put(
+                entry.getKey(),
+                new GridDhtPartitionFullMap(val.nodeId(), val.nodeOrder(), 
val.updateSequence(), val, false)
+            );
+        }
+
+        return map;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 53d9918569a..840a0f2da54 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.Compress;
 import org.apache.ignite.internal.Order;
 import org.apache.ignite.internal.managers.communication.ErrorMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -30,7 +31,6 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -39,17 +39,11 @@ import org.jetbrains.annotations.Nullable;
  * Sent in response to {@link GridDhtPartitionsSingleRequest} and during 
processing partitions exchange future.
  */
 public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMessage {
-    /** Local partitions. Serialized as {@link #partsBytes}, may be 
compressed. */
-    @GridToStringInclude
-    private Map<Integer, GridDhtPartitionMap> parts;
-
-    /**
-     * Serialized local partitions. Unmarshalled to {@link #parts}.
-     * <p>
-     * TODO Remove this field after completing task IGNITE-26976.
-     */
+    /** Local partitions. */
     @Order(6)
-    byte[] partsBytes;
+    @Compress
+    @GridToStringInclude
+    Map<Integer, GridDhtPartitionMap> parts;
 
     /** */
     @Order(7)
@@ -57,16 +51,19 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
 
     /** Partitions update counters. */
     @Order(8)
+    @Compress
     @GridToStringInclude
     Map<Integer, CachePartitionPartialCountersMap> partCntrs;
 
     /** Partitions sizes. */
     @Order(9)
+    @Compress
     @GridToStringInclude
     Map<Integer, IntLongMap> partsSizes;
 
     /** Partitions history reservation counters. */
     @Order(10)
+    @Compress
     @GridToStringInclude
     Map<Integer, IntLongMap> partHistCntrs;
 
@@ -105,17 +102,13 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
      * @param exchId Exchange ID.
      * @param client Client message flag.
      * @param lastVer Last version.
-     * @param compress {@code True} if it is possible to use compression for 
message.
      */
     public GridDhtPartitionsSingleMessage(GridDhtPartitionExchangeId exchId,
         boolean client,
-        @Nullable GridCacheVersion lastVer,
-        boolean compress
+        @Nullable GridCacheVersion lastVer
     ) {
         super(exchId, lastVer);
 
-        compressed(compress);
-
         this.client = client;
     }
 
@@ -166,7 +159,6 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         parts.put(cacheId, locMap);
 
         if (dupDataCache != null) {
-            assert compressed();
             assert F.isEmpty(locMap.map());
             assert parts.containsKey(dupDataCache);
 
@@ -303,36 +295,10 @@ public class GridDhtPartitionsSingleMessage extends 
GridDhtPartitionsAbstractMes
         this.exchangeStartTime = exchangeStartTime;
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        super.prepareMarshal(ctx);
-
-        if (parts != null && partsBytes == null) {
-            byte[] partsBytes0 = U.marshal(ctx, parts);
-
-            if (compressed()) {
-                try {
-                    partsBytes0 = U.zip(partsBytes0);
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(ctx.logger(getClass()), "Failed to compress 
partitions data: " + e, e);
-                }
-            }
-
-            partsBytes = partsBytes0;
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx, 
ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (partsBytes != null && parts == null) {
-            parts = compressed()
-                ? U.unmarshalZip(ctx.marshaller(), partsBytes, 
U.resolveClassLoader(ldr, ctx.gridConfig()))
-                : U.unmarshal(ctx, partsBytes, U.resolveClassLoader(ldr, 
ctx.gridConfig()));
-        }
-
         if (dupPartsData != null) {
             assert parts != null;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
index 855d3c79ef4..6db6f53ec62 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/GridPartitionStateMap.java
@@ -24,17 +24,22 @@ import java.util.BitSet;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import org.apache.ignite.internal.Order;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
  * Grid partition state map. States are encoded using bits.
  * <p>
  * Null values are prohibited.
  */
-public class GridPartitionStateMap extends AbstractMap<Integer, 
GridDhtPartitionState> implements Serializable {
+public class GridPartitionStateMap extends AbstractMap<Integer, 
GridDhtPartitionState> implements Serializable, Message {
     /** Empty map. */
     public static final GridPartitionStateMap EMPTY = new 
GridPartitionStateMap(0);
 
+    /** Type code. */
+    public static final short TYPE_CODE = 517;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -55,10 +60,12 @@ public class GridPartitionStateMap extends 
AbstractMap<Integer, GridDhtPartition
      * The first element takes the first {@link GridPartitionStateMap#BITS} 
bits in reverse order,
      * the second element next {@link GridPartitionStateMap#BITS} bits in 
reverse order, etc.
      */
-    private final BitSet states;
+    @Order(0)
+    BitSet states;
 
     /** */
-    private int size;
+    @Order(1)
+    int size;
 
     /** {@inheritDoc} */
     @Override public Set<Entry<Integer, GridDhtPartitionState>> entrySet() {
@@ -243,4 +250,9 @@ public class GridPartitionStateMap extends 
AbstractMap<Integer, GridDhtPartition
     @Override public int hashCode() {
         return 31 * states.hashCode() + size;
     }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return TYPE_CODE;
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index c8cad7c0df6..eda96716fac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -199,7 +199,18 @@ public interface MessageReader {
      * @param <T> Type of the message.
      * @return Message.
      */
-    public <T extends Message> T readMessage();
+    public default <T extends Message> T readMessage() {
+        return readMessage(false);
+    }
+
+    /**
+     * Reads nested message.
+     *
+     * @param compress Whether message should be decompressed.
+     * @param <T> Type of the message.
+     * @return Message.
+     */
+    public <T extends Message> T readMessage(boolean compress);
 
     /**
      * Reads {@link CacheObject}.
@@ -259,9 +270,25 @@ public interface MessageReader {
      * @param <M> Type of the read map.
      * @return Map.
      */
-    // TODO: IGNITE-26329 — switch to the new readMap method without the flag 
parameter
+    // TODO: IGNITE-26329 — switch to the new readMap method without the 
linked flag parameter
+    public default <M extends Map<?, ?>> M readMap(MessageCollectionItemType 
keyType,
+        MessageCollectionItemType valType, boolean linked) {
+        return readMap(keyType, valType, linked, false);
+    }
+
+    /**
+     * Reads map.
+     *
+     * @param keyType Map key type.
+     * @param valType Map value type.
+     * @param linked Whether {@link LinkedHashMap} should be created.
+     * @param compress Whether map should be compressed.
+     * @param <M> Type of the read map.
+     * @return Map.
+     */
+    // TODO: IGNITE-26329 — switch to the new readMap method without the 
linked flag parameter
     public <M extends Map<?, ?>> M readMap(MessageCollectionItemType keyType,
-        MessageCollectionItemType valType, boolean linked);
+        MessageCollectionItemType valType, boolean linked, boolean compress);
 
     /**
      * Tells whether last invocation of any of {@code readXXX(...)}
@@ -284,6 +311,11 @@ public interface MessageReader {
      */
     public void incrementState();
 
+    /**
+     * Decrements read state.
+     */
+    public void decrementState();
+
     /**
      * Callback called before inner message is read.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 43fda9736d9..e073ca83134 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -247,7 +247,18 @@ public interface MessageWriter {
      * @param val Message.
      * @return Whether value was fully written.
      */
-    public boolean writeMessage(Message val);
+    public default boolean writeMessage(Message val) {
+        return writeMessage(val, false);
+    }
+
+    /**
+     * Writes nested message.
+     *
+     * @param val Message.
+     * @param compress Whether message should be compressed.
+     * @return Whether value was fully written.
+     */
+    public boolean writeMessage(Message val, boolean compress);
 
     /**
      * Writes {@link CacheObject}.
@@ -313,8 +324,24 @@ public interface MessageWriter {
      * @param <V> Initial value types of the map to write.
      * @return Whether value was fully written.
      */
+    public default <K, V> boolean writeMap(Map<K, V> map, 
MessageCollectionItemType keyType,
+        MessageCollectionItemType valType) {
+        return writeMap(map, keyType, valType, false);
+    }
+
+    /**
+     * Writes map.
+     *
+     * @param map Map.
+     * @param keyType Map key type.
+     * @param valType Map value type.
+     * @param compress Whether map should be compressed.
+     * @param <K> Initial key types of the map to write.
+     * @param <V> Initial value types of the map to write.
+     * @return Whether value was fully written.
+     */
     public <K, V> boolean writeMap(Map<K, V> map, MessageCollectionItemType 
keyType,
-        MessageCollectionItemType valType);
+        MessageCollectionItemType valType, boolean compress);
 
     /**
      * @return Whether header of current message is already written.
@@ -338,6 +365,11 @@ public interface MessageWriter {
      */
     public void incrementState();
 
+    /**
+     * Decrements state.
+     */
+    public void decrementState();
+
     /**
      * Callback called before inner message is written.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index 95320a94f81..97a048a508d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -878,7 +878,7 @@ public class GridNioServerWrapper {
                     @Override public MessageWriter writer(GridNioSession ses) 
throws IgniteCheckedException {
                         // Enable sending wait message for a communication 
peer while context isn't initialized.
                         if (!stateProvider.spiContextAvailable())
-                            return new DirectMessageWriter(msgFactory);
+                            return new DirectMessageWriter(msgFactory, 
igniteCfg.getNetworkCompressionLevel());
 
                         final IgniteSpiContext ctx = 
stateProvider.getSpiContextWithoutInitialLatch();
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
index 7ddf1fa7e24..019364513a2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/codegen/MessageProcessorTest.java
@@ -270,6 +270,40 @@ public class MessageProcessorTest {
         assertThat(compilation).succeeded();
     }
 
+    /**
+     * Negative test that verifies the compilation failed if the 
CompressedMessage type is used in Message.
+     */
+    @Test
+    public void testCompressedMessageExplicitUsageFails() {
+        String errMsg = "CompressedMessage should not be used explicitly. To 
compress the required field use the @Compress annotation.";
+
+        Compilation compilation = compile("TestCompressedMessage.java");
+
+        assertThat(compilation).failed();
+        assertThat(compilation).hadErrorContaining(errMsg);
+
+        compilation = compile("TestCollectionsCompressedMessage.java");
+
+        assertThat(compilation).failed();
+        assertThat(compilation).hadErrorContaining(errMsg);
+
+        compilation = compile("TestMapCompressedMessage.java");
+
+        assertThat(compilation).failed();
+        assertThat(compilation).hadErrorContaining(errMsg);
+    }
+
+    /**
+     * Negative test that verifies the compilation failed if the Compress 
annotation is used for unsupported types.
+     */
+    @Test
+    public void testCompressAnnotationFailsForUnsupportedTypes() {
+        Compilation compilation = 
compile("TestCompressUnsupportedTypeMessage.java");
+
+        assertThat(compilation).failed();
+        assertThat(compilation).hadErrorContaining("Compress annotation is 
used for an unsupported type: java.util.List");
+    }
+
     /** */
     private Compilation compile(String... srcFiles) {
         return compile(new MessageProcessor(), srcFiles);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
index 4875ec0a465..317ff20a961 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractMessageSerializationTest.java
@@ -286,7 +286,7 @@ public abstract class AbstractMessageSerializationTest {
         }
 
         /** {@inheritDoc} */
-        @Override public boolean writeMessage(Message val) {
+        @Override public boolean writeMessage(Message val, boolean compress) {
             return writeField(Message.class);
         }
 
@@ -307,7 +307,7 @@ public abstract class AbstractMessageSerializationTest {
 
         /** {@inheritDoc} */
         @Override public <K, V> boolean writeMap(Map<K, V> map, 
MessageCollectionItemType keyType,
-            MessageCollectionItemType valType) {
+            MessageCollectionItemType valType, boolean compress) {
             return writeField(Map.class);
         }
 
@@ -329,6 +329,11 @@ public abstract class AbstractMessageSerializationTest {
             ++state;
         }
 
+        /** {@inheritDoc} */
+        @Override public void decrementState() {
+            --state;
+        }
+
         /** {@inheritDoc} */
         @Override public void beforeInnerMessageWrite() {}
 
@@ -518,7 +523,7 @@ public abstract class AbstractMessageSerializationTest {
         }
 
         /** {@inheritDoc} */
-        @Override public <T extends Message> T readMessage() {
+        @Override public <T extends Message> T readMessage(boolean compress) {
             readField(Message.class);
 
             return null;
@@ -568,7 +573,7 @@ public abstract class AbstractMessageSerializationTest {
 
         /** {@inheritDoc} */
         @Override public <M extends Map<?, ?>> M 
readMap(MessageCollectionItemType keyType,
-            MessageCollectionItemType valType, boolean linked) {
+            MessageCollectionItemType valType, boolean linked, boolean 
compress) {
             readField(Map.class);
 
             return null;
@@ -594,6 +599,11 @@ public abstract class AbstractMessageSerializationTest {
             ++state;
         }
 
+        /** {@inheritDoc} */
+        @Override public void decrementState() {
+            --state;
+        }
+
         /** {@inheritDoc} */
         @Override public void beforeInnerMessageRead() {}
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
new file mode 100644
index 00000000000..270fe7e47b3
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CompressedMessageTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.direct.DirectMessageReader;
+import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.direct.state.DirectMessageState;
+import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionsToReloadMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionReservationsMap;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsToReload;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import 
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Test for {@link CompressedMessage}. */
+public class CompressedMessageTest {
+    /** */
+    @Test
+    public void testWriteReadHugeMessage() {
+        MessageFactory msgFactory = new IgniteMessageFactoryImpl(new 
MessageFactoryProvider[]{new GridIoMessageFactory()});
+
+        DirectMessageWriter writer = new DirectMessageWriter(msgFactory);
+
+        ByteBuffer tmpBuf = ByteBuffer.allocate(4096);
+
+        writer.setBuffer(tmpBuf);
+
+        GridDhtPartitionsFullMessage fullMsg = fullMessage();
+
+        boolean finished = false;
+        boolean checkChunkCnt = true;
+
+        int cnt = 0;
+
+        ByteBuffer msgBuf = ByteBuffer.allocate(40_960);
+
+        while (!finished) {
+            finished = writer.writeMessage(fullMsg, true);
+
+            if (checkChunkCnt) {
+                DirectMessageState<?> state = U.field(writer, "state");
+                DirectByteBufferStream stream = U.field(state.item(), 
"stream");
+
+                CompressedMessage compressedMsg = stream.compressedMessage();
+
+                assertTrue(compressedMsg.dataSize() > 0);
+
+                byte[] compressedData = U.field((Object)U.field(compressedMsg, 
"chunkedReader"), "inputData");
+
+                assertTrue(compressedData.length > 
CompressedMessage.CHUNK_SIZE * 2);
+
+                checkChunkCnt = false;
+            }
+
+            tmpBuf.flip();
+
+            msgBuf.put(tmpBuf);
+
+            tmpBuf.clear();
+
+            cnt++;
+        }
+
+        assertTrue(cnt > 2);
+
+        msgBuf.flip();
+
+        DirectMessageReader reader = new DirectMessageReader(msgFactory, null);
+
+        reader.setBuffer(msgBuf);
+
+        Message readMsg = reader.readMessage(true);
+
+        assertTrue(readMsg instanceof GridDhtPartitionsFullMessage);
+
+        assertEqualsFullMsg(fullMsg, (GridDhtPartitionsFullMessage)readMsg);
+    }
+
+    /** */
+    private GridDhtPartitionsFullMessage fullMessage() {
+        IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new 
IgniteDhtPartitionHistorySuppliersMap();
+        IgniteDhtPartitionsToReloadMap partsToReload = new 
IgniteDhtPartitionsToReloadMap();
+
+        for (int i = 0; i < 500; i++) {
+            UUID uuid = UUID.randomUUID();
+
+            partHistSuppliers.put(uuid, i, i + 1, i + 2);
+            partsToReload.put(uuid, i, i + 1);
+        }
+
+        return new GridDhtPartitionsFullMessage(null, null, new 
AffinityTopologyVersion(0), partHistSuppliers, partsToReload);
+    }
+
+    /** */
+    private void assertEqualsFullMsg(GridDhtPartitionsFullMessage expected, 
GridDhtPartitionsFullMessage actual) {
+        Map<UUID, PartitionReservationsMap> expHistSuppliers = 
U.field(expected.partitionHistorySuppliers(), "map");
+        Map<UUID, PartitionReservationsMap> actHistSuppliers = 
U.field(actual.partitionHistorySuppliers(), "map");
+
+        assertEquals(expHistSuppliers.size(), actHistSuppliers.size());
+
+        for (Map.Entry<UUID, PartitionReservationsMap> entry : 
expHistSuppliers.entrySet())
+            assertEquals(entry.getValue().reservations(), 
actHistSuppliers.get(entry.getKey()).reservations());
+
+        Map<UUID, CachePartitionsToReloadMap> expPartsToReload = 
U.field((Object)U.field(expected, "partsToReload"), "map");
+        Map<UUID, CachePartitionsToReloadMap> actPartsToReload = 
U.field((Object)U.field(actual, "partsToReload"), "map");
+
+        assertEquals(expPartsToReload.size(), actPartsToReload.size());
+
+        for (Map.Entry<UUID, CachePartitionsToReloadMap> entry : 
expPartsToReload.entrySet()) {
+            Map<Integer, PartitionsToReload> expCachePartitions = 
U.field(entry.getValue(), "map");
+            Map<Integer, PartitionsToReload> actCachePartitions = 
U.field(actPartsToReload.get(entry.getKey()), "map");
+
+            assertEquals(expCachePartitions.size(), actCachePartitions.size());
+
+            for (Map.Entry<Integer, PartitionsToReload> partsEntry : 
expCachePartitions.entrySet())
+                assertEquals(partsEntry.getValue().partitions(), 
actCachePartitions.get(partsEntry.getKey()).partitions());
+        }
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 4bb13394dcf..6763eee53fa 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.TransactionsMXBeanImplTest;
 import 
org.apache.ignite.internal.codegen.IgniteDataTransferObjectProcessorTest;
 import org.apache.ignite.internal.codegen.MessageProcessorTest;
 import 
org.apache.ignite.internal.managers.communication.CacheEntryPredicateAdapterMessageTest;
+import org.apache.ignite.internal.managers.communication.CompressedMessageTest;
 import org.apache.ignite.internal.managers.communication.DefaultEnumMapperTest;
 import org.apache.ignite.internal.managers.communication.ErrorMessageSelfTest;
 import 
org.apache.ignite.internal.managers.communication.IndexKeyTypeMessageTest;
@@ -154,6 +155,7 @@ import org.junit.runners.Suite;
     DefaultEnumMapperTest.class,
     IndexKeyTypeMessageTest.class,
     IgniteDataTransferObjectProcessorTest.class,
+    CompressedMessageTest.class
 })
 public class IgniteBasicTestSuite {
 }
diff --git 
a/modules/core/src/test/resources/codegen/TestCollectionsCompressedMessage.java 
b/modules/core/src/test/resources/codegen/TestCollectionsCompressedMessage.java
new file mode 100644
index 00000000000..0eac72c3549
--- /dev/null
+++ 
b/modules/core/src/test/resources/codegen/TestCollectionsCompressedMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.List;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestCollectionsCompressedMessage implements Message {
+    @Order(0)
+    List<CompressedMessage> messageList;
+
+    public short directType() {
+        return 0;
+    }
+}
diff --git 
a/modules/core/src/test/resources/codegen/TestCompressUnsupportedTypeMessage.java
 
b/modules/core/src/test/resources/codegen/TestCompressUnsupportedTypeMessage.java
new file mode 100644
index 00000000000..2c5e96bdab4
--- /dev/null
+++ 
b/modules/core/src/test/resources/codegen/TestCompressUnsupportedTypeMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.List;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestCompressUnsupportedTypeMessage implements Message {
+    @Order(0)
+    @Compress
+    List<String> message;
+
+    public short directType() {
+        return 0;
+    }
+}
\ No newline at end of file
diff --git a/modules/core/src/test/resources/codegen/TestCompressedMessage.java 
b/modules/core/src/test/resources/codegen/TestCompressedMessage.java
new file mode 100644
index 00000000000..2d96ca53e70
--- /dev/null
+++ b/modules/core/src/test/resources/codegen/TestCompressedMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestCompressedMessage implements Message {
+    @Order(0)
+    CompressedMessage message;
+
+    public short directType() {
+        return 0;
+    }
+}
diff --git 
a/modules/core/src/test/resources/codegen/TestMapCompressedMessage.java 
b/modules/core/src/test/resources/codegen/TestMapCompressedMessage.java
new file mode 100644
index 00000000000..c76c5cc301f
--- /dev/null
+++ b/modules/core/src/test/resources/codegen/TestMapCompressedMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.Map;
+import org.apache.ignite.internal.managers.communication.CompressedMessage;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+public class TestMapCompressedMessage implements Message {
+    @Order(0)
+    Map<String, CompressedMessage> stringMessageMap;
+
+    public short directType() {
+        return 0;
+    }
+}

Reply via email to