This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 406144b  IGNITE-13515 Performance drop when there are many thin 
clients per server. This closes #8307
406144b is described below

commit 406144b33df3526a6055356333c158c8f89a38ce
Author: Igor Seliverstov <gvvinbl...@gmail.com>
AuthorDate: Tue Oct 6 15:05:14 2020 +0300

    IGNITE-13515 Performance drop when there are many thin clients per server. 
This closes #8307
---
 .../org/apache/ignite/IgniteSystemProperties.java  |  10 +-
 .../ClientConnectorConfiguration.java              |  38 ++-
 .../internal/binary/BinaryThreadLocalContext.java  |   2 +-
 .../binary/streams/BinaryHeapOutputStream.java     |   2 +-
 .../binary/streams/BinaryMemoryAllocator.java      | 280 ++++++++++++++++++---
 .../binary/streams/BinaryMemoryAllocatorChunk.java |  73 +-----
 .../internal/client/thin/TcpClientChannel.java     | 137 +++++++---
 .../odbc/ClientListenerMessageParser.java          |   8 +-
 .../processors/odbc/ClientListenerNioListener.java |  14 +-
 ...er.java => ClientListenerNioMessageParser.java} |  68 +++--
 .../odbc/ClientListenerNioServerBuffer.java        | 113 ---------
 .../processors/odbc/ClientListenerProcessor.java   |  25 +-
 .../internal/processors/odbc/ClientMessage.java    | 189 ++++++++++++++
 .../odbc/jdbc/JdbcConnectionContext.java           |   4 +-
 .../processors/odbc/jdbc/JdbcMessageParser.java    |  19 +-
 .../odbc/odbc/OdbcConnectionContext.java           |   4 +-
 .../processors/odbc/odbc/OdbcMessageParser.java    |  19 +-
 .../platform/client/ClientMessageParser.java       |  18 +-
 .../internal/util/io/GridUnsafeDataInput.java      |   2 +-
 .../internal/util/io/GridUnsafeDataOutput.java     |   2 +-
 .../ignite/internal/util/nio/GridNioServer.java    |  41 +--
 .../internal/binary/BinaryMarshallerSelfTest.java  |  14 +-
 22 files changed, 718 insertions(+), 364 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 10197d0..7aa506b 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -53,7 +53,8 @@ import static 
org.apache.ignite.internal.IgniteKernal.DFLT_PERIODIC_STARVATION_C
 import static 
org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD;
 import static 
org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT;
 import static 
org.apache.ignite.internal.LongJVMPauseDetector.DFLT_JVM_PAUSE_DETECTOR_PRECISION;
-import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE;
+import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
 import static 
org.apache.ignite.internal.managers.discovery.GridDiscoveryManager.DFLT_DISCOVERY_HISTORY_SIZE;
 import static 
org.apache.ignite.internal.processors.affinity.AffinityAssignment.DFLT_AFFINITY_BACKUPS_THRESHOLD;
 import static 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache.DFLT_AFFINITY_HISTORY_SIZE;
@@ -529,6 +530,13 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_MARSHAL_BUFFERS_RECHECK = 
"IGNITE_MARSHAL_BUFFERS_RECHECK";
 
     /**
+     * System property to specify per thread binary allocator chunk pool size. 
Default value is {@code 32}.
+     */
+    @SystemProperty(value = "Per thread binary allocator chunk pool size.",
+        type = Integer.class, defaults = "" + 
DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE)
+    public static final String IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = 
"IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE";
+
+    /**
      * System property to disable {@link HostnameVerifier} for SSL connections.
      * Can be used for development with self-signed certificates. Default 
value is {@code false}.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
 
b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
index a6cfcbd..b1f0c68 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java
@@ -45,6 +45,9 @@ public class ClientConnectorConfiguration {
     /** Default size of thread pool. */
     public static final int DFLT_THREAD_POOL_SIZE = 
IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT;
 
+    /** Default selector count. */
+    public static final int DFLT_SELECTOR_CNT = Math.max(4, 
Runtime.getRuntime().availableProcessors() / 2);
+
     /** Default handshake timeout. */
     public static final int DFLT_HANDSHAKE_TIMEOUT = 10_000;
 
@@ -78,6 +81,9 @@ public class ClientConnectorConfiguration {
     /** Thread pool size. */
     private int threadPoolSize = DFLT_THREAD_POOL_SIZE;
 
+    /** Selector count. */
+    private int selectorCnt = DFLT_SELECTOR_CNT;
+
     /** Idle timeout. */
     private long idleTimeout = DFLT_IDLE_TIMEOUT;
 
@@ -297,21 +303,21 @@ public class ClientConnectorConfiguration {
     }
 
     /**
-     * Size of thread pool that is in charge of processing SQL requests.
+     * Size of thread pool that is in charge of processing client requests.
      * <p>
      * Defaults {@link #DFLT_THREAD_POOL_SIZE}.
      *
-     * @return Thread pool that is in charge of processing SQL requests.
+     * @return Thread pool that is in charge of processing client requests.
      */
     public int getThreadPoolSize() {
         return threadPoolSize;
     }
 
     /**
-     * Sets thread pool that is in charge of processing SQL requests. See 
{@link #getThreadPoolSize()} for more
+     * Sets thread pool that is in charge of processing client requests. See 
{@link #getThreadPoolSize()} for more
      * information.
      *
-     * @param threadPoolSize Thread pool that is in charge of processing SQL 
requests.
+     * @param threadPoolSize Thread pool that is in charge of processing 
client requests.
      * @return This instance for chaining.
      */
     public ClientConnectorConfiguration setThreadPoolSize(int threadPoolSize) {
@@ -321,6 +327,30 @@ public class ClientConnectorConfiguration {
     }
 
     /**
+     * Get count of selectors to use in TCP server.
+     * <p>
+     * Defaults {@link #DFLT_SELECTOR_CNT}.
+     *
+     * @return Count of selectors to use in TCP server.
+     */
+    public int getSelectorCount() {
+        return selectorCnt;
+    }
+
+    /**
+     * Sets count of selectors to use in TCP server. See {@link 
#getSelectorCount()} for more
+     * information.
+     *
+     * @param selectorCnt Count of selectors to use in TCP server.
+     * @return This instance for chaining.
+     */
+    public ClientConnectorConfiguration setSelectorCount(int selectorCnt) {
+        this.selectorCnt = selectorCnt;
+
+        return this;
+    }
+
+    /**
      * Gets idle timeout for client connections.
      * If no packets come within idle timeout, the connection is closed.
      * Zero or negative means no timeout.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
index f8da90b..1b68207 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryThreadLocalContext.java
@@ -32,7 +32,7 @@ public class BinaryThreadLocalContext {
     };
 
     /** Memory chunk. */
-    private final BinaryMemoryAllocatorChunk chunk = 
BinaryMemoryAllocator.INSTANCE.chunk();
+    private final BinaryMemoryAllocatorChunk chunk = 
BinaryMemoryAllocator.THREAD_LOCAL.chunk();
 
     /** Schema holder. */
     private final BinaryWriterSchemaHolder schema = new 
BinaryWriterSchemaHolder();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
index 17bcdf6..0f411b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
@@ -37,7 +37,7 @@ public final class BinaryHeapOutputStream extends 
BinaryAbstractOutputStream {
      * @param cap Initial capacity.
      */
     public BinaryHeapOutputStream(int cap) {
-        this(cap, BinaryMemoryAllocator.INSTANCE.chunk());
+        this(cap, BinaryMemoryAllocator.THREAD_LOCAL.chunk());
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
index 5471bc5..8993fb3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocator.java
@@ -17,41 +17,265 @@
 
 package org.apache.ignite.internal.binary.streams;
 
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
+
 /**
- * Thread-local memory allocator.
+ * On-heap memory allocator.
  */
-public final class BinaryMemoryAllocator {
-    /** Memory allocator instance. */
-    public static final BinaryMemoryAllocator INSTANCE = new 
BinaryMemoryAllocator();
-
-    /** Holders. */
-    private static final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new 
ThreadLocal<>();
-
-    /**
-     * Ensures singleton.
-     */
-    private BinaryMemoryAllocator() {
-        // No-op.
-    }
+public abstract class BinaryMemoryAllocator {
+    /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */
+    public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000;
+
+    /** @see 
IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE */
+    public static final int DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE = 32;
+
+    /** Buffer size re-check frequency. */
+    private static final Long CHECK_FREQ = 
Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK);
+
+    /** */
+    private static final int POOL_SIZE = 
Integer.getInteger(IGNITE_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE, 
DFLT_MARSHAL_BUFFERS_PER_THREAD_POOL_SIZE);
+
+    /** Thread local allocator instance. */
+    public static final BinaryMemoryAllocator THREAD_LOCAL = new 
ThreadLocalAllocator();
+
+    /** Pooled allocator instance. */
+    public static final BinaryMemoryAllocator POOLED = new PooledAllocator();
+
+    /** */
+    public abstract BinaryMemoryAllocatorChunk chunk();
+
+    /** */
+    public abstract boolean isAcquired();
+
+    /** */
+    private static class ThreadLocalAllocator extends BinaryMemoryAllocator {
+        /** Holders. */
+        private final ThreadLocal<BinaryMemoryAllocatorChunk> holders = new 
ThreadLocal<>();
+
+        /** {@inheritDoc} */
+        @Override public BinaryMemoryAllocatorChunk chunk() {
+            BinaryMemoryAllocatorChunk holder = holders.get();
+
+            if (holder == null)
+                holders.set(holder = new Chunk());
+
+            return holder;
+        }
+
+        /**
+         * Checks whether a thread-local array is acquired or not.
+         * The function is used by Unit tests.
+         *
+         * @return {@code true} if acquired {@code false} otherwise.
+         */
+        @Override public boolean isAcquired() {
+            BinaryMemoryAllocatorChunk holder = holders.get();
+
+            return holder != null && holder.isAcquired();
+        }
+
+        /**
+         * Memory allocator chunk.
+         */
+        private static class Chunk implements BinaryMemoryAllocatorChunk {
+            /** Data array */
+            private byte[] data;
+
+            /** Max message size detected between checks. */
+            private int maxMsgSize;
+
+            /** Last time array size is checked. */
+            private long lastCheckNanos = System.nanoTime();
+
+            /** Whether the holder is acquired or not. */
+            private boolean acquired;
+
+            /** {@inheritDoc} */
+            @Override public byte[] allocate(int size) {
+                if (acquired)
+                    return new byte[size];
+
+                acquired = true;
+
+                if (data == null || size > data.length)
+                    data = new byte[size];
+
+                return data;
+            }
+
+            /** {@inheritDoc} */
+            @Override public byte[] reallocate(byte[] data, int size) {
+                byte[] newData = new byte[size];
+
+                if (this.data == data)
+                    this.data = newData;
+
+                System.arraycopy(data, 0, newData, 0, data.length);
+
+                return newData;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void release(byte[] data, int maxMsgSize) {
+                if (this.data != data)
+                    return;
 
-    public BinaryMemoryAllocatorChunk chunk() {
-        BinaryMemoryAllocatorChunk holder = holders.get();
+                if (maxMsgSize > this.maxMsgSize)
+                    this.maxMsgSize = maxMsgSize;
 
-        if (holder == null)
-            holders.set(holder = new BinaryMemoryAllocatorChunk());
+                acquired = false;
 
-        return holder;
+                long nowNanos = System.nanoTime();
+
+                if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) {
+                    int halfSize = data.length >> 1;
+
+                    if (this.maxMsgSize < halfSize)
+                        this.data = new byte[halfSize];
+
+                    lastCheckNanos = nowNanos;
+                }
+            }
+
+            /** {@inheritDoc} */
+            @Override public boolean isAcquired() {
+                return acquired;
+            }
+        }
     }
 
-    /**
-     * Checks whether a thread-local array is acquired or not.
-     * The function is used by Unit tests.
-     *
-     * @return {@code true} if acquired {@code false} otherwise.
-     */
-    public boolean isAcquired() {
-        BinaryMemoryAllocatorChunk holder = holders.get();
+    /** */
+    private static class PooledAllocator extends BinaryMemoryAllocator {
+        /** */
+        private final ThreadLocal<DataHoldersPool> holders = 
ThreadLocal.withInitial(DataHoldersPool::new);
+
+        /** {@inheritDoc} */
+        @Override public BinaryMemoryAllocatorChunk chunk() {
+            return new Chunk(holders.get());
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isAcquired() {
+            return false;
+        }
+
+        /** */
+        private static class DataHoldersPool {
+            /** */
+            private final ArrayDeque<DataHolder> pool = new 
ArrayDeque<>(POOL_SIZE);
+
+            /** */
+            public synchronized DataHolder acquire() {
+                return pool.isEmpty() ? new DataHolder() : pool.pop();
+            }
+
+            /** */
+            public synchronized void release(DataHolder holder) {
+                if (pool.size() < POOL_SIZE) pool.push(holder);
+            }
+        }
+
+        /** */
+        private static class DataHolder {
+            /** Size history. */
+            private final int[] history = new int[128];
+
+            /** Size history cntr. */
+            private int cntr;
+
+            /** Last time array size is checked. */
+            private long lastCheckNanos = System.nanoTime();
+
+            /** Data array */
+            private byte[] data;
+
+            /** */
+            public byte[] ensureCapacity(int size, boolean copy) {
+                if (data == null)
+                    data = new byte[size];
+                else if (data.length < size)
+                    data = copy ? Arrays.copyOf(data, size) : new byte[size];
+
+                return data;
+            }
+
+            /** */
+            public boolean corresponds(byte[] data) {
+                return data != null && this.data == data;
+            }
+
+            /** */
+            public void adjustSize(int msgSize) {
+                history[cntr % history.length] = msgSize;
+                cntr = cntr == Integer.MAX_VALUE ? 0 : cntr + 1;
+
+                long now = System.nanoTime();
+                if (U.nanosToMillis(now - lastCheckNanos) >= CHECK_FREQ && 
cntr > history.length) {
+                    lastCheckNanos = now;
+
+                    int[] tmp = Arrays.copyOf(history, history.length);
+                    Arrays.sort(tmp);
+                    int adjusted = U.nextPowerOf2(tmp[tmp.length / 2]);
+
+                    if (adjusted < data.length)
+                        data = new byte[adjusted];
+                }
+            }
+        }
+
+        /** */
+        private static class Chunk implements BinaryMemoryAllocatorChunk {
+            /** */
+            private volatile DataHolder holder;
+
+            /** */
+            private final DataHoldersPool pool;
+
+            /** */
+            private Chunk(DataHoldersPool pool) {
+                this.pool = pool;
+            }
+
+            /** {@inheritDoc} */
+            @Override public byte[] allocate(int size) {
+                if (holder != null)
+                    return new byte[size];
+
+                holder = pool.acquire();
+                return holder.ensureCapacity(size, false);
+            }
+
+            /** {@inheritDoc} */
+            @Override public byte[] reallocate(byte[] data, int size) {
+                DataHolder holder0 = holder;
+                if (holder0 != null && holder0.corresponds(data))
+                    return holder0.ensureCapacity(size, true);
+                else
+                    return Arrays.copyOf(data, size);
+            }
+
+            /** {@inheritDoc} */
+            @Override public void release(byte[] data, int msgSize) {
+                DataHolder holder0 = holder;
+                if (holder0 == null || !holder0.corresponds(data))
+                    return;
+
+                holder.adjustSize(msgSize);
+
+                pool.release(holder);
+                holder = null;
+            }
 
-        return holder != null && holder.isAcquired();
+            /** {@inheritDoc} */
+            @Override public boolean isAcquired() {
+                return holder != null;
+            }
+        }
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
index 27570cf..5f2dfee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryMemoryAllocatorChunk.java
@@ -17,50 +17,17 @@
 
 package org.apache.ignite.internal.binary.streams;
 
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-
 /**
  * Memory allocator chunk.
  */
-public class BinaryMemoryAllocatorChunk {
-    /** @see IgniteSystemProperties#IGNITE_MARSHAL_BUFFERS_RECHECK */
-    public static final int DFLT_MARSHAL_BUFFERS_RECHECK = 10000;
-
-    /** Buffer size re-check frequency. */
-    private static final Long CHECK_FREQ = 
Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, DFLT_MARSHAL_BUFFERS_RECHECK);
-
-    /** Data array */
-    private byte[] data;
-
-    /** Max message size detected between checks. */
-    private int maxMsgSize;
-
-    /** Last time array size is checked. */
-    private long lastCheckNanos = System.nanoTime();
-
-    /** Whether the holder is acquired or not. */
-    private boolean acquired;
-
+public interface BinaryMemoryAllocatorChunk {
     /**
      * Allocate.
      *
      * @param size Desired size.
      * @return Data.
      */
-    public byte[] allocate(int size) {
-        if (acquired)
-            return new byte[size];
-
-        acquired = true;
-
-        if (data == null || size > data.length)
-            data = new byte[size];
-
-        return data;
-    }
+    public byte[] allocate(int size);
 
     /**
      * Reallocate.
@@ -69,45 +36,15 @@ public class BinaryMemoryAllocatorChunk {
      * @param size Size.
      * @return New data.
      */
-    public byte[] reallocate(byte[] data, int size) {
-        byte[] newData = new byte[size];
-
-        if (this.data == data)
-            this.data = newData;
-
-        System.arraycopy(data, 0, newData, 0, data.length);
-
-        return newData;
-    }
+    public byte[] reallocate(byte[] data, int size);
 
     /**
      * Shrinks array size if needed.
      */
-    public void release(byte[] data, int maxMsgSize) {
-        if (this.data != data)
-            return;
-
-        if (maxMsgSize > this.maxMsgSize)
-            this.maxMsgSize = maxMsgSize;
-
-        this.acquired = false;
-
-        long nowNanos = System.nanoTime();
-
-        if (U.nanosToMillis(nowNanos - lastCheckNanos) >= CHECK_FREQ) {
-            int halfSize = data.length >> 1;
-
-            if (this.maxMsgSize < halfSize)
-                this.data = new byte[halfSize];
-
-            lastCheckNanos = nowNanos;
-        }
-    }
+    public void release(byte[] data, int maxMsgSize);
 
     /**
      * @return {@code True} if acquired.
      */
-    public boolean isAcquired() {
-        return acquired;
-    }
+    public boolean isAcquired();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index acc0ffe..28330e1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -410,14 +410,15 @@ class TcpClientChannel implements ClientChannel {
      * Process next message from the input stream and complete corresponding 
future.
      */
     private void processNextMessage() throws ClientProtocolError, 
ClientConnectionException {
-        int msgSize = dataInput.readInt();
+        // blocking read a message header not to fall into a busy loop
+        int msgSize = dataInput.readInt(2048);
 
         if (msgSize <= 0)
             throw new ClientProtocolError(String.format("Invalid message size: 
%s", msgSize));
 
         long bytesReadOnStartMsg = dataInput.totalBytesRead();
 
-        long resId = dataInput.readLong();
+        long resId = dataInput.spinReadLong();
 
         int status = 0;
 
@@ -426,11 +427,11 @@ class TcpClientChannel implements ClientChannel {
         BinaryInputStream resIn;
 
         if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
-            short flags = dataInput.readShort();
+            short flags = dataInput.spinReadShort();
 
             if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
-                long topVer = dataInput.readLong();
-                int minorTopVer = dataInput.readInt();
+                long topVer = dataInput.spinReadLong();
+                int minorTopVer = dataInput.spinReadInt();
 
                 srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
 
@@ -439,7 +440,7 @@ class TcpClientChannel implements ClientChannel {
             }
 
             if ((flags & ClientFlag.NOTIFICATION) != 0) {
-                short notificationCode = dataInput.readShort();
+                short notificationCode = dataInput.spinReadShort();
 
                 notificationOp = ClientOperation.fromCode(notificationCode);
 
@@ -448,10 +449,10 @@ class TcpClientChannel implements ClientChannel {
             }
 
             if ((flags & ClientFlag.ERROR) != 0)
-                status = dataInput.readInt();
+                status = dataInput.spinReadInt();
         }
         else
-            status = dataInput.readInt();
+            status = dataInput.spinReadInt();
 
         int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
 
@@ -460,12 +461,12 @@ class TcpClientChannel implements ClientChannel {
 
         if (status == 0) {
             if (msgSize > hdrSize)
-                res = dataInput.read(msgSize - hdrSize);
+                res = dataInput.spinRead(msgSize - hdrSize);
         }
         else if (status == ClientStatus.SECURITY_VIOLATION)
             err = new ClientAuthorizationException();
         else {
-            resIn = new BinaryHeapInputStream(dataInput.read(msgSize - 
hdrSize));
+            resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - 
hdrSize));
 
             String errMsg = ClientUtils.createBinaryReader(null, 
resIn).readString();
 
@@ -713,47 +714,81 @@ class TcpClientChannel implements ClientChannel {
             this.in = in;
         }
 
+        /** Read bytes from the input stream. */
+        public byte[] read(int len) throws ClientConnectionException {
+            byte[] bytes = new byte[len];
+
+            read(bytes, len, 0);
+
+            return bytes;
+        }
+
+        /** Read bytes from the input stream. */
+        public byte[] spinRead(int len) {
+            byte[] bytes = new byte[len];
+
+            read(bytes, len, Integer.MAX_VALUE);
+
+            return bytes;
+        }
+
         /**
          * Read bytes from the input stream to the buffer.
          *
          * @param bytes Bytes buffer.
          * @param len Length.
+         * @param tryReadCnt Number of reads before falling into blocking read.
          */
-        private void read(byte[] bytes, int len) throws 
ClientConnectionException {
-            int bytesNum;
-            int readBytesNum = 0;
-
-            while (readBytesNum < len) {
-                try {
-                    bytesNum = in.read(bytes, readBytesNum, len - 
readBytesNum);
-                }
-                catch (IOException e) {
-                    throw handleIOError(e);
-                }
+        public void read(byte[] bytes, int len, int tryReadCnt) throws 
ClientConnectionException {
+            int offset = 0;
 
-                if (bytesNum < 0)
-                    throw handleIOError(null);
+            try {
+                while (offset < len) {
+                    int toRead;
 
-                readBytesNum += bytesNum;
-            }
+                    if (tryReadCnt == 0)
+                        toRead = len - offset;
+                    else if ((toRead = Math.min(in.available(), len - offset)) 
== 0) {
+                        tryReadCnt--;
 
-            totalBytesRead += readBytesNum;
-        }
+                        continue;
+                    }
 
-        /** Read bytes from the input stream. */
-        public byte[] read(int len) throws ClientConnectionException {
-            byte[] bytes = new byte[len];
+                    int read = in.read(bytes, offset, toRead);
 
-            read(bytes, len);
+                    if (read < 0)
+                        throw handleIOError(null);
 
-            return bytes;
+                    offset += read;
+                    totalBytesRead += read;
+                }
+            }
+            catch (IOException e) {
+                throw handleIOError(e);
+            }
         }
 
         /**
          * Read long value from the input stream.
          */
         public long readLong() throws ClientConnectionException {
-            read(tmpBuf, Long.BYTES);
+            return readLong(0);
+        }
+
+        /**
+         * Read long value from the input stream.
+         */
+        public long spinReadLong() throws ClientConnectionException {
+            return readLong(Integer.MAX_VALUE);
+        }
+
+        /**
+         * Read long value from the input stream.
+         *
+         * @param tryReadCnt Number of reads before falling into blocking read.
+         */
+        private long readLong(int tryReadCnt) throws ClientConnectionException 
{
+            read(tmpBuf, Long.BYTES, tryReadCnt);
 
             return BinaryPrimitives.readLong(tmpBuf, 0);
         }
@@ -762,7 +797,23 @@ class TcpClientChannel implements ClientChannel {
          * Read int value from the input stream.
          */
         public int readInt() throws ClientConnectionException {
-            read(tmpBuf, Integer.BYTES);
+            return readInt(0);
+        }
+
+        /**
+         * Read int value from the input stream.
+         */
+        public int spinReadInt() throws ClientConnectionException {
+            return readInt(Integer.MAX_VALUE);
+        }
+
+        /**
+         * Read int value from the input stream.
+         *
+         * @param tryReadCnt Number of reads before falling into blocking read.
+         */
+        private int readInt(int tryReadCnt) throws ClientConnectionException {
+            read(tmpBuf, Integer.BYTES, tryReadCnt);
 
             return BinaryPrimitives.readInt(tmpBuf, 0);
         }
@@ -771,7 +822,23 @@ class TcpClientChannel implements ClientChannel {
          * Read short value from the input stream.
          */
         public short readShort() throws ClientConnectionException {
-            read(tmpBuf, Short.BYTES);
+            return readShort(0);
+        }
+
+        /**
+         * Read short value from the input stream.
+         */
+        public short spinReadShort() throws ClientConnectionException {
+            return readShort(Integer.MAX_VALUE);
+        }
+
+        /**
+         * Read short value from the input stream.
+         *
+         * @param tryReadCnt Number of reads before falling into blocking read.
+         */
+        public short readShort(int tryReadCnt) throws 
ClientConnectionException {
+            read(tmpBuf, Short.BYTES, tryReadCnt);
 
             return BinaryPrimitives.readShort(tmpBuf, 0);
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
index c3782ef..4dfc117 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java
@@ -27,7 +27,7 @@ public interface ClientListenerMessageParser {
      * @param msg Message.
      * @return Request.
      */
-    ClientListenerRequest decode(byte[] msg);
+    ClientListenerRequest decode(ClientMessage msg);
 
     /**
      * Encode response to byte array.
@@ -35,7 +35,7 @@ public interface ClientListenerMessageParser {
      * @param resp Response.
      * @return Message.
      */
-    byte[] encode(ClientListenerResponse resp);
+    ClientMessage encode(ClientListenerResponse resp);
 
     /**
      * Decode command type. Allows to recognize the command (message type) 
without decoding the entire message.
@@ -43,7 +43,7 @@ public interface ClientListenerMessageParser {
      * @param msg Message.
      * @return Command type.
      */
-    int decodeCommandType(byte[] msg);
+    int decodeCommandType(ClientMessage msg);
 
     /**
      * Decode request Id. Allows to recognize the request Id, if any, without 
decoding the entire message.
@@ -51,5 +51,5 @@ public interface ClientListenerMessageParser {
      * @param msg Message.
      * @return Request Id.
      */
-    long decodeRequestId(byte[] msg);
+    long decodeRequestId(ClientMessage msg);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index 369eb2d..393383d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -53,7 +53,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Client message listener.
  */
-public class ClientListenerNioListener extends 
GridNioServerListenerAdapter<byte[]> {
+public class ClientListenerNioListener extends 
GridNioServerListenerAdapter<ClientMessage> {
     /** ODBC driver handshake code. */
     public static final byte ODBC_CLIENT = 0;
 
@@ -139,7 +139,7 @@ public class ClientListenerNioListener extends 
GridNioServerListenerAdapter<byte
     }
 
     /** {@inheritDoc} */
-    @Override public void onMessage(GridNioSession ses, byte[] msg) {
+    @Override public void onMessage(GridNioSession ses, ClientMessage msg) {
         assert msg != null;
 
         ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
@@ -214,9 +214,7 @@ public class ClientListenerNioListener extends 
GridNioServerListenerAdapter<byte
                         ", resp=" + resp.status() + ']');
                 }
 
-                byte[] outMsg = parser.encode(resp);
-
-                GridNioFuture<?> fut = ses.send(outMsg);
+                    GridNioFuture<?> fut = ses.send(parser.encode(resp));
 
                 fut.listen(f -> {
                     if (f.error() == null)
@@ -289,7 +287,7 @@ public class ClientListenerNioListener extends 
GridNioServerListenerAdapter<byte
      * @param ses Session.
      * @param msg Message bytes.
      */
-    private void onHandshake(GridNioSession ses, byte[] msg) {
+    private void onHandshake(GridNioSession ses, ClientMessage msg) {
         BinaryContext ctx = new 
BinaryContext(BinaryCachingMetadataHandler.create(), new IgniteConfiguration(), 
null);
 
         BinaryMarshaller marsh = new BinaryMarshaller();
@@ -298,7 +296,7 @@ public class ClientListenerNioListener extends 
GridNioServerListenerAdapter<byte
 
         ctx.configure(marsh);
 
-        BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new 
BinaryHeapInputStream(msg), null, true);
+        BinaryReaderExImpl reader = new BinaryReaderExImpl(ctx, new 
BinaryHeapInputStream(msg.payload()), null, true);
 
         byte cmd = reader.readByte();
 
@@ -373,7 +371,7 @@ public class ClientListenerNioListener extends 
GridNioServerListenerAdapter<byte
                 writer.writeInt(ClientStatus.FAILED);
         }
 
-        ses.send(writer.array());
+        ses.send(new ClientMessage(writer.array()));
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
similarity index 56%
rename from 
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
index 48c7367..9c01770 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioMessageParser.java
@@ -19,14 +19,16 @@ package org.apache.ignite.internal.processors.odbc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.nio.GridNioParser;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
 
 /**
- * This class implements stream parser based on {@link 
ClientListenerNioServerBuffer}.
+ * This class implements stream parser.
  * <p>
  * The rule for this parser is that every message sent over the stream is 
prepended with
  * 4-byte integer header containing message size. So, the stream structure is 
as follows:
@@ -36,45 +38,57 @@ import 
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
  *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
  * </pre>
  */
-public class ClientListenerBufferedParser implements GridNioParser {
-    /** Buffer metadata key. */
-    private static final int BUF_META_KEY = 
GridNioSessionMetaKey.nextUniqueKey();
+public class ClientListenerNioMessageParser implements GridNioParser {
+    /** Message metadata key. */
+    static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
-    /** {@inheritDoc} */
-    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws 
IOException, IgniteCheckedException {
-        ClientListenerNioServerBuffer nioBuf = ses.meta(BUF_META_KEY);
-
-        // Decode for a given session is called per one thread, so there 
should not be any concurrency issues.
-        // However, we make some additional checks.
-        if (nioBuf == null) {
-            nioBuf = new ClientListenerNioServerBuffer();
-
-            ClientListenerNioServerBuffer old = ses.addMeta(BUF_META_KEY, 
nioBuf);
+    /** Reader metadata key. */
+    static final int READER_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
 
-            assert old == null;
-        }
+    /** */
+    private final IgniteLogger log;
 
-        return nioBuf.read(buf);
+    /** */
+    public ClientListenerNioMessageParser(IgniteLogger log) {
+        this.log = log;
     }
 
     /** {@inheritDoc} */
-    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws 
IOException, IgniteCheckedException {
-        byte[] msg0 = (byte[])msg;
+    @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws 
IOException, IgniteCheckedException {
+        Message msg = ses.removeMeta(MSG_META_KEY);
+
+        try {
+            if (msg == null)
+                msg = new ClientMessage();
 
-        ByteBuffer res = ByteBuffer.allocate(msg0.length + 4);
+            boolean finished = false;
 
-        res.order(ByteOrder.LITTLE_ENDIAN);
+            if (buf.hasRemaining())
+                finished = msg.readFrom(buf, null);
 
-        res.putInt(msg0.length);
-        res.put(msg0);
+            if (finished)
+                return msg;
+            else {
+                ses.addMeta(MSG_META_KEY, msg);
 
-        res.flip();
+                return null;
+            }
+        }
+        catch (Throwable e) {
+            U.error(log, "Failed to read message [msg=" + msg +
+                    ", buf=" + buf + ", ses=" + ses + "]", e);
+
+            throw e;
+        }
+    }
 
-        return res;
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws 
IOException, IgniteCheckedException {
+        throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return ClientListenerBufferedParser.class.getSimpleName();
+        return ClientListenerNioMessageParser.class.getSimpleName();
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java
deleted file mode 100644
index ba9d6bf..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client NIO server buffer.
- */
-public class ClientListenerNioServerBuffer {
-    /** Current message data. */
-    private byte[] data;
-
-    /** Count of received bytes of the current message. */
-    private int cnt = -4;
-
-    /** Current message size. */
-    private int msgSize;
-
-    /**
-     * Reset buffer state.
-     */
-    public void reset() {
-        msgSize = 0;
-        cnt = -4;
-        data = null;
-    }
-
-    /**
-     * Checks whether the byte array is filled.
-     *
-     * @return Flag indicating whether byte array is filled or not.
-     */
-    public boolean isFilled() {
-        return cnt > 0 && cnt == msgSize;
-    }
-
-    /**
-     * Get data withing the buffer.
-     *
-     * @return Data.
-     */
-    public byte[] data() {
-        return data;
-    }
-
-    /**
-     * @param buf Buffer.
-     * @return Message bytes or {@code null} if message is not fully read yet.
-     * @throws IgniteCheckedException If failed to parse message.
-     */
-    @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException 
{
-        if (cnt < 0) {
-            for (; cnt < 0 && buf.hasRemaining(); cnt++)
-                msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt));
-
-            if (cnt < 0)
-                return null;
-
-            // If count is 0 then message size should be inited.
-            if (msgSize <= 0)
-                throw new IgniteCheckedException("Invalid message size: " + 
msgSize);
-
-            data = new byte[msgSize];
-        }
-
-        assert msgSize > 0;
-        assert cnt >= 0;
-
-        int remaining = buf.remaining();
-
-        // If there are more bytes in buffer.
-        if (remaining > 0) {
-            int missing = msgSize - cnt;
-
-            // Read only up to message size.
-            if (missing > 0) {
-                int len = missing < remaining ? missing : remaining;
-
-                buf.get(data, cnt, len);
-
-                cnt += len;
-            }
-        }
-
-        if (cnt == msgSize) {
-            byte[] data0 = data;
-
-            reset();
-
-            return data0;
-        }
-
-        return null;
-    }
-}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
index ecebc4a..517d213 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
@@ -86,13 +86,13 @@ public class ClientListenerProcessor extends 
GridProcessorAdapter {
     private static final int DFLT_SELECTOR_CNT = Math.min(4, 
Runtime.getRuntime().availableProcessors());
 
     /** Default TCP direct buffer flag. */
-    private static final boolean DFLT_TCP_DIRECT_BUF = false;
+    private static final boolean DFLT_TCP_DIRECT_BUF = true;
 
     /** Busy lock. */
     private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** TCP Server. */
-    private GridNioServer<byte[]> srv;
+    private GridNioServer<ClientMessage> srv;
 
     /** Executor service. */
     private ExecutorService execSvc;
@@ -158,14 +158,16 @@ public class ClientListenerProcessor extends 
GridProcessorAdapter {
 
                 long idleTimeout = cliConnCfg.getIdleTimeout();
 
+                int selectorCnt = cliConnCfg.getSelectorCount();
+
                 for (int port = cliConnCfg.getPort(); port <= portTo && port 
<= 65535; port++) {
                     try {
-                        GridNioServer<byte[]> srv0 = 
GridNioServer.<byte[]>builder()
+                        srv = GridNioServer.<ClientMessage>builder()
                             .address(hostAddr)
                             .port(port)
                             .listener(new ClientListenerNioListener(ctx, 
busyLock, cliConnCfg))
                             .logger(log)
-                            .selectorCount(DFLT_SELECTOR_CNT)
+                            .selectorCount(selectorCnt)
                             .igniteInstanceName(ctx.igniteInstanceName())
                             .serverName("client-listener")
                             .tcpNoDelay(cliConnCfg.isTcpNoDelay())
@@ -174,12 +176,10 @@ public class ClientListenerProcessor extends 
GridProcessorAdapter {
                             
.socketSendBufferSize(cliConnCfg.getSocketSendBufferSize())
                             
.socketReceiveBufferSize(cliConnCfg.getSocketReceiveBufferSize())
                             .filters(filters)
-                            .directMode(false)
+                            .directMode(true)
                             .idleTimeout(idleTimeout > 0 ? idleTimeout : 
Long.MAX_VALUE)
                             .build();
 
-                        srv = srv0;
-
                         ctx.ports().registerPort(port, IgnitePortProtocol.TCP, 
getClass());
 
                         if (log.isInfoEnabled())
@@ -287,14 +287,14 @@ public class ClientListenerProcessor extends 
GridProcessorAdapter {
                 ClientListenerConnectionContext connCtx = 
ses.meta(ClientListenerNioListener.CONN_CTX_META_KEY);
 
                 if (connCtx != null && connCtx.parser() != null && 
connCtx.handler().isCancellationSupported()) {
-                    byte[] inMsg;
+                    ClientMessage inMsg;
 
                     int cmdType;
 
                     long reqId;
 
                     try {
-                        inMsg = (byte[])msg;
+                        inMsg = (ClientMessage)msg;
 
                         cmdType = connCtx.parser().decodeCommandType(inMsg);
 
@@ -324,7 +324,7 @@ public class ClientListenerProcessor extends 
GridProcessorAdapter {
             }
         };
 
-        GridNioFilter codecFilter = new GridNioCodecFilter(new 
ClientListenerBufferedParser(), log, false);
+        GridNioFilter codecFilter = new GridNioCodecFilter(new 
ClientListenerNioMessageParser(log), log, true);
 
         if (cliConnCfg.isSslEnabled()) {
             Factory<SSLContext> sslCtxFactory = 
cliConnCfg.isUseIgniteSslContextFactory() ?
@@ -337,7 +337,7 @@ public class ClientListenerProcessor extends 
GridProcessorAdapter {
             GridNioSslFilter sslFilter = new 
GridNioSslFilter(sslCtxFactory.create(),
                 true, ByteOrder.nativeOrder(), log);
 
-            sslFilter.directMode(false);
+            sslFilter.directMode(true);
 
             boolean auth = cliConnCfg.isSslClientAuth();
 
@@ -411,8 +411,7 @@ public class ClientListenerProcessor extends 
GridProcessorAdapter {
     private static String clientConnectionDescription(
         GridNioSession ses,
         ClientListenerConnectionContext ctx
-    )
-    {
+    ) {
         AuthorizationContext authCtx = ctx.authorizationContext();
 
         StringBuilder sb = new StringBuilder();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
new file mode 100644
index 0000000..f1176d6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientMessage.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+@IgniteCodeGeneratingFail
+public class ClientMessage implements Message, Externalizable {
+    /** */
+    private static final long serialVersionUID = -4609408156037304495L;
+
+    /** */
+    private byte[] data;
+
+    /** */
+    private BinaryHeapOutputStream stream;
+
+    /** */
+    private int cnt = -4;
+
+    /** */
+    private int msgSize;
+
+    /** */
+    public ClientMessage() {}
+
+    /** */
+    public ClientMessage(byte[] data) {
+        this.data = data;
+    }
+
+    /** */
+    public ClientMessage(BinaryHeapOutputStream stream) {
+        this.stream = stream;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter ignored) {
+        assert stream != null || data != null;
+
+        byte[] data = stream != null ? stream.array() : this.data;
+        int msgSize = stream != null ? stream.position() : data.length;
+
+        if (cnt < 0) {
+            for (; cnt < 0 && buf.hasRemaining(); cnt++)
+                buf.put((byte) ((msgSize >> (8 * (4 + cnt))) & 0xFF));
+
+            if (cnt < 0)
+                return false;
+        }
+
+        assert cnt >= 0;
+        assert msgSize > 0;
+
+        int remaining = buf.remaining();
+
+        if (remaining > 0) {
+            int missing = msgSize - cnt;
+
+            if (missing > 0) {
+                int len = Math.min(missing, remaining);
+
+                buf.put(data, cnt, len);
+
+                cnt += len;
+            }
+        }
+
+        if (cnt == msgSize) {
+            cnt = -4;
+
+            if (stream != null) {
+                U.closeQuiet(stream);
+
+                stream = null;
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        if (cnt < 0) {
+            for (; cnt < 0 && buf.hasRemaining(); cnt++)
+                msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt));
+
+            if (cnt < 0)
+                return false;
+
+            data = new byte[msgSize];
+        }
+
+        assert data != null;
+        assert cnt >= 0;
+        assert msgSize > 0;
+
+        int remaining = buf.remaining();
+
+        if (remaining > 0) {
+            int missing = msgSize - cnt;
+
+            if (missing > 0) {
+                int len = Math.min(missing, remaining);
+
+                buf.get(data, cnt, len);
+
+                cnt += len;
+            }
+        }
+
+        if (cnt == msgSize) {
+            cnt = -4;
+            msgSize = 0;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return Short.MIN_VALUE;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op
+    }
+
+    /**
+     * @return Message payload.
+     */
+    public byte[] payload() {
+        if (stream != null) {
+            data = stream.arrayCopy();
+            U.closeQuiet(stream);
+            stream = null;
+        }
+
+        return data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        byte[] data = payload();
+        out.writeInt(data.length);
+        out.write(data);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        data = new byte[in.readInt()];
+        in.read(data, 0, data.length);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 0d9ea39..64cf609 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -216,9 +216,7 @@ public class JdbcConnectionContext extends 
ClientListenerAbstractConnectionConte
                     if (log.isDebugEnabled())
                         log.debug("Async response: [resp=" + resp.status() + 
']');
 
-                    byte[] outMsg = parser.encode(resp);
-
-                    ses.send(outMsg);
+                    ses.send(parser.encode(resp));
                 }
             }
         };
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
index fdf7a12..255012b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
@@ -29,6 +29,7 @@ import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
 import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
 
 /**
  * JDBC message parser.
@@ -60,8 +61,8 @@ public class JdbcMessageParser implements 
ClientListenerMessageParser {
      * @param msg Message.
      * @return Reader.
      */
-    protected BinaryReaderExImpl createReader(byte[] msg) {
-        BinaryInputStream stream = new BinaryHeapInputStream(msg);
+    protected BinaryReaderExImpl createReader(ClientMessage msg) {
+        BinaryInputStream stream = new BinaryHeapInputStream(msg.payload());
 
         return new BinaryReaderExImpl(binCtx, stream, 
ctx.config().getClassLoader(), true);
     }
@@ -76,7 +77,7 @@ public class JdbcMessageParser implements 
ClientListenerMessageParser {
     }
 
     /** {@inheritDoc} */
-    @Override public ClientListenerRequest decode(byte[] msg) {
+    @Override public ClientListenerRequest decode(ClientMessage msg) {
         assert msg != null;
 
         BinaryReaderExImpl reader = createReader(msg);
@@ -85,7 +86,7 @@ public class JdbcMessageParser implements 
ClientListenerMessageParser {
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] encode(ClientListenerResponse msg) {
+    @Override public ClientMessage encode(ClientListenerResponse msg) {
         assert msg != null;
 
         assert msg instanceof JdbcResponse;
@@ -96,20 +97,20 @@ public class JdbcMessageParser implements 
ClientListenerMessageParser {
 
         res.writeBinary(writer, protoCtx);
 
-        return writer.array();
+        return new ClientMessage(writer.array());
     }
 
     /** {@inheritDoc} */
-    @Override public int decodeCommandType(byte[] msg) {
+    @Override public int decodeCommandType(ClientMessage msg) {
         assert msg != null;
 
-        return JdbcRequest.readType(msg);
+        return JdbcRequest.readType(msg.payload());
     }
 
     /** {@inheritDoc} */
-    @Override public long decodeRequestId(byte[] msg) {
+    @Override public long decodeRequestId(ClientMessage msg) {
         assert msg != null;
 
-        return JdbcRequest.readRequestId(msg);
+        return JdbcRequest.readRequestId(msg.payload());
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
index 0cc22d1..74def0d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcConnectionContext.java
@@ -157,9 +157,7 @@ public class OdbcConnectionContext extends 
ClientListenerAbstractConnectionConte
                     if (log.isDebugEnabled())
                         log.debug("Async response: [resp=" + resp.status() + 
']');
 
-                    byte[] outMsg = parser.encode(resp);
-
-                    ses.send(outMsg);
+                    ses.send(parser.encode(resp));
                 }
             }
         };
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
index 0e66c93..c9b779f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
 import 
org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
 import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
 import org.jetbrains.annotations.NotNull;
 
@@ -78,10 +79,10 @@ public class OdbcMessageParser implements 
ClientListenerMessageParser {
     }
 
     /** {@inheritDoc} */
-    @Override public ClientListenerRequest decode(byte[] msg) {
+    @Override public ClientListenerRequest decode(ClientMessage msg) {
         assert msg != null;
 
-        BinaryInputStream stream = new BinaryHeapInputStream(msg);
+        BinaryInputStream stream = new BinaryHeapInputStream(msg.payload());
 
         BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), 
stream, ctx.config().getClassLoader(), true);
 
@@ -242,7 +243,7 @@ public class OdbcMessageParser implements 
ClientListenerMessageParser {
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] encode(ClientListenerResponse msg0) {
+    @Override public ClientMessage encode(ClientListenerResponse msg0) {
         assert msg0 != null;
 
         assert msg0 instanceof OdbcResponse;
@@ -263,13 +264,13 @@ public class OdbcMessageParser implements 
ClientListenerMessageParser {
         if (msg.status() != ClientListenerResponse.STATUS_SUCCESS) {
             writer.writeString(msg.error());
 
-            return writer.array();
+            return new ClientMessage(writer.array());
         }
 
         Object res0 = msg.response();
 
         if (res0 == null)
-            return writer.array();
+            return new ClientMessage(writer.array());
         else if (res0 instanceof OdbcQueryExecuteResult) {
             OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
 
@@ -406,19 +407,19 @@ public class OdbcMessageParser implements 
ClientListenerMessageParser {
         else
             assert false : "Should not reach here.";
 
-        return writer.array();
+        return new ClientMessage(writer.array());
     }
 
     /** {@inheritDoc} */
-    @Override public int decodeCommandType(byte[] msg) {
+    @Override public int decodeCommandType(ClientMessage msg) {
         assert msg != null;
 
-        return msg[0];
+        return msg.payload()[0];
     }
 
 
     /** {@inheritDoc} */
-    @Override public long decodeRequestId(byte[] msg) {
+    @Override public long decodeRequestId(ClientMessage msg) {
         return 0;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index d3b10c5..66b9e89 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -23,10 +23,12 @@ import 
org.apache.ignite.internal.binary.GridBinaryMarshaller;
 import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator;
 import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.odbc.ClientListenerMessageParser;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.ClientMessage;
 import 
org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeGetRequest;
 import 
org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNameGetRequest;
 import 
org.apache.ignite.internal.processors.platform.client.binary.ClientBinaryTypeNamePutRequest;
@@ -284,10 +286,10 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
     }
 
     /** {@inheritDoc} */
-    @Override public ClientListenerRequest decode(byte[] msg) {
+    @Override public ClientListenerRequest decode(ClientMessage msg) {
         assert msg != null;
 
-        BinaryInputStream inStream = new BinaryHeapInputStream(msg);
+        BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
 
         // skipHdrCheck must be true (we have 103 op code).
         BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), 
inStream,
@@ -473,10 +475,10 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] encode(ClientListenerResponse resp) {
+    @Override public ClientMessage encode(ClientListenerResponse resp) {
         assert resp != null;
 
-        BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32);
+        BinaryHeapOutputStream outStream = new BinaryHeapOutputStream(32, 
BinaryMemoryAllocator.POOLED.chunk());
 
         BinaryRawWriterEx writer = marsh.writer(outStream);
 
@@ -484,20 +486,20 @@ public class ClientMessageParser implements 
ClientListenerMessageParser {
 
         ((ClientOutgoingMessage)resp).encode(ctx, writer);
 
-        return outStream.arrayCopy();
+        return new ClientMessage(outStream);
     }
 
     /** {@inheritDoc} */
-    @Override public int decodeCommandType(byte[] msg) {
+    @Override public int decodeCommandType(ClientMessage msg) {
         assert msg != null;
 
-        BinaryInputStream inStream = new BinaryHeapInputStream(msg);
+        BinaryInputStream inStream = new BinaryHeapInputStream(msg.payload());
 
         return inStream.readShort();
     }
 
     /** {@inheritDoc} */
-    @Override public long decodeRequestId(byte[] msg) {
+    @Override public long decodeRequestId(ClientMessage msg) {
         return 0;
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
index a914078..b3c8cda 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataInput.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
 import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
index 93f1a95..a165a07 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/io/GridUnsafeDataOutput.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocatorChunk.DFLT_MARSHAL_BUFFERS_RECHECK;
+import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.DFLT_MARSHAL_BUFFERS_RECHECK;
 import static org.apache.ignite.internal.util.GridUnsafe.BIG_ENDIAN;
 import static org.apache.ignite.internal.util.GridUnsafe.BYTE_ARR_OFF;
 import static org.apache.ignite.internal.util.GridUnsafe.CHAR_ARR_OFF;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 9c0c6b1..41574ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1401,16 +1401,7 @@ public class GridNioServer<T> {
 
             GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
 
-            MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-
-            if (writer == null) {
-                try {
-                    ses.addMeta(MSG_WRITER.ordinal(), writer = 
writerFactory.writer(ses));
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IOException("Failed to create message writer.", 
e);
-                }
-            }
+            MessageWriter writer = messageWriter(ses);
 
             boolean handshakeFinished = sslFilter.lock(ses);
 
@@ -1659,16 +1650,7 @@ public class GridNioServer<T> {
             ByteBuffer buf = ses.writeBuffer();
             SessionWriteRequest req = ses.removeMeta(NIO_OPERATION.ordinal());
 
-            MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
-
-            if (writer == null) {
-                try {
-                    ses.addMeta(MSG_WRITER.ordinal(), writer = 
writerFactory.writer(ses));
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IOException("Failed to create message writer.", 
e);
-                }
-            }
+            MessageWriter writer = messageWriter(ses);
 
             if (req == null) {
                 req = systemMessage(ses);
@@ -1739,6 +1721,25 @@ public class GridNioServer<T> {
                 buf.clear();
         }
 
+        /** */
+        @Nullable private MessageWriter 
messageWriter(GridSelectorNioSessionImpl ses) throws IOException {
+            if (writerFactory == null)
+                return null;
+
+            MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
+
+            if (writer == null) {
+                try {
+                    ses.addMeta(MSG_WRITER.ordinal(), writer = 
writerFactory.writer(ses));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to create message writer.", 
e);
+                }
+            }
+
+            return writer;
+        }
+
         /**
          * @param writer Customizer of writing.
          * @param buf Buffer to write.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
index 8e6c966..1c92e8d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
@@ -107,7 +107,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.INSTANCE;
+import static 
org.apache.ignite.internal.binary.streams.BinaryMemoryAllocator.THREAD_LOCAL;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertNotEquals;
 
@@ -2834,28 +2834,28 @@ public class BinaryMarshallerSelfTest extends 
GridCommonAbstractTest {
     @Test
     public void testThreadLocalArrayReleased() throws Exception {
         // Checking the writer directly.
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
 
         BinaryMarshaller marsh = binaryMarshaller();
 
         try (BinaryWriterExImpl writer = new 
BinaryWriterExImpl(binaryContext(marsh))) {
-            assertEquals(true, INSTANCE.isAcquired());
+            assertEquals(true, THREAD_LOCAL.isAcquired());
 
             writer.writeString("Thread local test");
 
             writer.array();
 
-            assertEquals(true, INSTANCE.isAcquired());
+            assertEquals(true, THREAD_LOCAL.isAcquired());
         }
 
         // Checking the binary marshaller.
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
 
         marsh = binaryMarshaller();
 
         marsh.marshal(new SimpleObject());
 
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
 
         marsh = binaryMarshaller();
 
@@ -2867,7 +2867,7 @@ public class BinaryMarshallerSelfTest extends 
GridCommonAbstractTest {
 
         BinaryObject binaryObj = builder.build();
 
-        assertEquals(false, INSTANCE.isAcquired());
+        assertEquals(false, THREAD_LOCAL.isAcquired());
     }
 
     /**

Reply via email to