This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 338165a  IGNITE-13496 Java thin: make async API non-blocking with 
GridNioServer
338165a is described below

commit 338165afadd3b6979b4655ee2f03f3b9c2228236
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Wed Dec 2 19:39:25 2020 +0300

    IGNITE-13496 Java thin: make async API non-blocking with GridNioServer
    
    Refactor Java Thin Client to use GridNioServer in client mode:
    * Client threads are never blocked
    * Single worker thread is shared across all connections within 
`IgniteClient`
    
    Benchmark results (i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275):
    
    Before
    Benchmark                         Mode  Cnt      Score      Error  Units
    JmhThinClientCacheBenchmark.get  thrpt   10  65916.805 ± 2118.954  ops/s
    JmhThinClientCacheBenchmark.put  thrpt   10  62304.444 ± 2521.371  ops/s
    
    After
    Benchmark                         Mode  Cnt      Score      Error  Units
    JmhThinClientCacheBenchmark.get  thrpt   10  92501.557 ± 1380.384  ops/s
    JmhThinClientCacheBenchmark.put  thrpt   10  82907.446 ± 7572.537  ops/s
---
 .../jmh/thin/JmhThinClientAbstractBenchmark.java   | 135 ++++
 .../jmh/thin/JmhThinClientCacheBenchmark.java      |  81 +++
 .../streams/BinaryByteBufferInputStream.java       |  91 +--
 .../internal/client/thin/ClientComputeImpl.java    |   7 +-
 .../internal/client/thin/ClientSslUtils.java       | 293 +++++++++
 .../internal/client/thin/NotificationListener.java |   4 +-
 .../internal/client/thin/PayloadInputChannel.java  |   8 +-
 .../internal/client/thin/ReliableChannel.java      |  63 +-
 .../internal/client/thin/TcpClientChannel.java     | 679 +++------------------
 .../internal/client/thin/TcpIgniteClient.java      |  27 +-
 .../ClientConnection.java}                         |  25 +-
 .../thin/io/ClientConnectionMultiplexer.java       |  52 ++
 .../ClientConnectionStateHandler.java}             |  19 +-
 .../client/thin/io/ClientMessageDecoder.java       |  92 +++
 .../ClientMessageHandler.java}                     |  19 +-
 .../io/gridnioserver/GridNioClientConnection.java  |  93 +++
 .../GridNioClientConnectionMultiplexer.java        | 147 +++++
 .../io/gridnioserver/GridNioClientListener.java    |  73 +++
 .../thin/io/gridnioserver/GridNioClientParser.java |  59 ++
 .../ignite/client/ConnectToStartingNodeTest.java   |  18 +-
 .../apache/ignite/client/SslParametersTest.java    |   4 +-
 .../internal/client/thin/ReliableChannelTest.java  |   9 +-
 .../ThinClientAbstractPartitionAwarenessTest.java  |   9 +-
 ...lientPartitionAwarenessResourceReleaseTest.java |  14 +-
 24 files changed, 1228 insertions(+), 793 deletions(-)

diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java
new file mode 100644
index 0000000..6b6dc53
--- /dev/null
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.stream.IntStream;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+
+/**
+ * Base class for thin client benchmarks.
+ */
+@State(Scope.Benchmark)
+public abstract class JmhThinClientAbstractBenchmark extends 
JmhAbstractBenchmark {
+    /** Property: nodes count. */
+    protected static final String PROP_DATA_NODES = 
"ignite.jmh.thin.dataNodes";
+
+    /** Default amount of nodes. */
+    protected static final int DFLT_DATA_NODES = 4;
+
+    /** Items count. */
+    protected static final int CNT = 1000;
+
+    /** Cache value. */
+    protected static final byte[] PAYLOAD = new byte[1000];
+
+    /** IP finder shared across nodes. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Default cache name. */
+    private static final String DEFAULT_CACHE_NAME = "default";
+
+    /** Target node. */
+    protected Ignite node;
+
+    /** Target cache. */
+    protected ClientCache<Integer, byte[]> cache;
+
+    /** Thin client. */
+    protected IgniteClient client;
+
+    /**
+     * Setup routine. Child classes must invoke this method first.
+     *
+     */
+    @Setup
+    public void setup() {
+        System.out.println();
+        System.out.println("--------------------");
+        System.out.println("IGNITE BENCHMARK INFO: ");
+        System.out.println("\tdata nodes:                 " + 
intProperty(PROP_DATA_NODES, DFLT_DATA_NODES));
+        System.out.println("--------------------");
+        System.out.println();
+
+        int nodesCnt = intProperty(PROP_DATA_NODES, DFLT_DATA_NODES);
+
+        A.ensure(nodesCnt >= 1, "nodesCnt >= 1");
+
+        node = Ignition.start(configuration("node0"));
+
+        for (int i = 1; i < nodesCnt; i++)
+            Ignition.start(configuration("node" + i));
+
+        String[] addrs = IntStream
+                .range(10800, 10800 + nodesCnt)
+                .mapToObj(p -> "127.0.0.1:" + p)
+                .toArray(String[]::new);
+
+        ClientConfiguration cfg = new ClientConfiguration()
+                .setAddresses(addrs)
+                .setPartitionAwarenessEnabled(true);
+
+        client = Ignition.startClient(cfg);
+
+        cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        System.out.println("Loading test data...");
+
+        for (int i = 0; i < CNT; i++)
+            cache.put(i, PAYLOAD);
+
+        System.out.println("Test data loaded: " + CNT);
+    }
+
+    /**
+     * Tear down routine.
+     *
+     */
+    @TearDown
+    public void tearDown() throws Exception {
+        client.close();
+        Ignition.stopAll(true);
+    }
+
+    /**
+     * Create Ignite configuration.
+     *
+     * @param igniteInstanceName Ignite instance name.
+     * @return Configuration.
+     */
+    protected IgniteConfiguration configuration(String igniteInstanceName) {
+
+        return new IgniteConfiguration()
+                .setIgniteInstanceName(igniteInstanceName)
+                .setLocalHost("127.0.0.1")
+                .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+    }
+}
diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
new file mode 100644
index 0000000..88e6a87
--- /dev/null
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Mode;
+
+/**
+ * Thin client cache benchmark.
+ *
+ * Results on i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275:
+ * Benchmark                         Mode  Cnt      Score      Error  Units
+ * JmhThinClientCacheBenchmark.get  thrpt   10  92501.557 ± 1380.384  ops/s
+ * JmhThinClientCacheBenchmark.put  thrpt   10  82907.446 ± 7572.537  ops/s
+ *
+ * JmhThinClientCacheBenchmark.get  avgt    10  41.505 ± 1.018        us/op
+ * JmhThinClientCacheBenchmark.put  avgt    10  44.623 ± 0.779        us/op
+ */
+public class JmhThinClientCacheBenchmark extends 
JmhThinClientAbstractBenchmark {
+    /**
+     * Cache put benchmark.
+     */
+    @Benchmark
+    public void put() {
+        int key = ThreadLocalRandom.current().nextInt(CNT);
+
+        cache.put(key, PAYLOAD);
+    }
+
+    /**
+     * Cache get benchmark.
+     */
+    @Benchmark
+    public Object get() {
+        int key = ThreadLocalRandom.current().nextInt(CNT);
+
+        return cache.get(key);
+    }
+
+    /**
+     * Run benchmarks.
+     *
+     * @param args Arguments.
+     * @throws Exception If failed.
+     */
+    public static void main(String[] args) throws Exception {
+        JmhIdeBenchmarkRunner runner = JmhIdeBenchmarkRunner.create()
+                .forks(1)
+                .threads(4)
+                .benchmarks(JmhThinClientCacheBenchmark.class.getSimpleName())
+                .jvmArguments("-Xms4g", "-Xmx4g");
+
+        runner
+                .benchmarkModes(Mode.Throughput)
+                .run();
+
+        runner
+                .benchmarkModes(Mode.AverageTime)
+                .outputTimeUnit(TimeUnit.MICROSECONDS)
+                .run();
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java
index d277948..fe138e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java
@@ -18,14 +18,14 @@
 package org.apache.ignite.internal.binary.streams;
 
 import java.nio.ByteBuffer;
-import org.apache.ignite.binary.BinaryObjectException;
+import java.util.Arrays;
 
 /**
- *
+ * Input stream over {@link ByteBuffer}.
  */
 public class BinaryByteBufferInputStream implements BinaryInputStream {
     /** */
-    private ByteBuffer buf;
+    private final ByteBuffer buf;
 
     /**
      * @param buf Buffer to wrap.
@@ -44,15 +44,11 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public byte readByte() {
-        ensureHasData(1);
-
         return buf.get();
     }
 
     /** {@inheritDoc} */
     @Override public byte[] readByteArray(int cnt) {
-        ensureHasData(cnt);
-
         byte[] data = new byte[cnt];
 
         buf.get(data);
@@ -62,22 +58,16 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public int read(byte[] arr, int off, int cnt) {
-        ensureHasData(cnt);
-
         return 0;
     }
 
     /** {@inheritDoc} */
     @Override public boolean readBoolean() {
-        ensureHasData(1);
-
-        return false;
+        return readByte() == 1;
     }
 
     /** {@inheritDoc} */
     @Override public boolean[] readBooleanArray(int cnt) {
-        ensureHasData(cnt);
-
         boolean[] res = new boolean[cnt];
 
         for (int i = 0; i < cnt; i++)
@@ -88,15 +78,11 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public short readShort() {
-        ensureHasData(2);
-
         return buf.getShort();
     }
 
     /** {@inheritDoc} */
     @Override public short[] readShortArray(int cnt) {
-        ensureHasData(2 * cnt);
-
         short[] res = new short[cnt];
 
         for (int i = 0; i < cnt; i++)
@@ -107,15 +93,11 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public char readChar() {
-        ensureHasData(2);
-
         return buf.getChar();
     }
 
     /** {@inheritDoc} */
     @Override public char[] readCharArray(int cnt) {
-        ensureHasData(2 * cnt);
-
         char[] res = new char[cnt];
 
         for (int i = 0; i < cnt; i++)
@@ -126,15 +108,11 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public int readInt() {
-        ensureHasData(4);
-
         return buf.getInt();
     }
 
     /** {@inheritDoc} */
     @Override public int[] readIntArray(int cnt) {
-        ensureHasData(4 * cnt);
-
         int[] res = new int[cnt];
 
         for (int i = 0; i < cnt; i++)
@@ -145,15 +123,11 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public float readFloat() {
-        ensureHasData(4);
-
         return buf.getFloat();
     }
 
     /** {@inheritDoc} */
     @Override public float[] readFloatArray(int cnt) {
-        ensureHasData(4 * cnt);
-
         float[] res = new float[cnt];
 
         for (int i = 0; i < cnt; i++)
@@ -164,15 +138,11 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public long readLong() {
-        ensureHasData(8);
-
         return buf.getLong();
     }
 
     /** {@inheritDoc} */
     @Override public long[] readLongArray(int cnt) {
-        ensureHasData(8 * cnt);
-
         long[] res = new long[cnt];
 
         for (int i = 0; i < cnt; i++)
@@ -183,15 +153,11 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public double readDouble() {
-        ensureHasData(8);
-
         return buf.getDouble();
     }
 
     /** {@inheritDoc} */
     @Override public double[] readDoubleArray(int cnt) {
-        ensureHasData(8 * cnt);
-
         double[] res = new double[cnt];
 
         for (int i = 0; i < cnt; i++)
@@ -207,47 +173,17 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public byte readBytePositioned(int pos) {
-        int oldPos = buf.position();
-
-        buf.position(pos);
-
-        ensureHasData(1);
-
-        byte res = buf.get();
-
-        buf.position(oldPos);
-
-        return res;
+        return buf.get(pos);
     }
 
     /** {@inheritDoc} */
     @Override public short readShortPositioned(int pos) {
-        int oldPos = buf.position();
-
-        buf.position(pos);
-
-        ensureHasData(2);
-
-        short res = buf.getShort();
-
-        buf.position(oldPos);
-
-        return res;
+        return buf.getShort(pos);
     }
 
     /** {@inheritDoc} */
     @Override public int readIntPositioned(int pos) {
-        int oldPos = buf.position();
-
-        buf.position(pos);
-
-        ensureHasData(4);
-
-        byte res = buf.get();
-
-        buf.position(oldPos);
-
-        return res;
+        return buf.getInt(pos);
     }
 
     /** {@inheritDoc} */
@@ -277,7 +213,9 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
 
     /** {@inheritDoc} */
     @Override public byte[] arrayCopy() {
-        return buf.array();
+        byte[] arr = buf.array();
+
+        return Arrays.copyOf(arr, arr.length);
     }
 
     /** {@inheritDoc} */
@@ -289,13 +227,4 @@ public class BinaryByteBufferInputStream implements 
BinaryInputStream {
     @Override public boolean hasArray() {
         return false;
     }
-
-    /**
-     * @param cnt Remaining bytes.
-     */
-    private void ensureHasData(int cnt) {
-        if (buf.remaining() < cnt)
-            throw new BinaryObjectException("Not enough data to read the value 
" +
-                "[requiredBytes=" + cnt + ", remainingBytes=" + 
buf.remaining() + ']');
-    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index d4cb415..65d1c2d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -40,7 +41,7 @@ import 
org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
 import org.apache.ignite.client.IgniteClientFuture;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -353,11 +354,11 @@ class ClientComputeImpl implements ClientCompute, 
NotificationListener {
         ClientChannel ch,
         ClientOperation op,
         long rsrcId,
-        byte[] payload,
+        ByteBuffer payload,
         Exception err
     ) {
         if (op == ClientOperation.COMPUTE_TASK_FINISHED) {
-            Object res = payload == null ? null : utils.readObject(new 
BinaryHeapInputStream(payload), false);
+            Object res = payload == null ? null : 
utils.readObject(BinaryByteBufferInputStream.create(payload), false);
 
             ClientComputeTask<Object> task = addTask(ch, rsrcId);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java
new file mode 100644
index 0000000..4f964d8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import javax.cache.configuration.Factory;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.ignite.client.SslMode;
+import org.apache.ignite.client.SslProtocol;
+import org.apache.ignite.configuration.ClientConfiguration;
+
+import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM;
+import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE;
+
+public class ClientSslUtils {
+    /** */
+    public static final char[] EMPTY_CHARS = new char[0];
+
+    /** Trust manager ignoring all certificate checks. */
+    private static final TrustManager ignoreErrorsTrustMgr = new 
X509TrustManager() {
+        /** */
+        @Override public X509Certificate[] getAcceptedIssuers() {
+            return null;
+        }
+
+        /** */
+        @Override public void checkServerTrusted(X509Certificate[] arg0, 
String arg1) {
+            // No-op.
+        }
+
+        /** */
+        @Override public void checkClientTrusted(X509Certificate[] arg0, 
String arg1) {
+            // No-op.
+        }
+    };
+
+    /**
+     * Gets SSL context for the given client configuration.
+     *
+     * @param cfg Configuration.
+     * @return {@link SSLContext} when SSL is enabled in the configuration; 
null otherwise.
+     */
+    public static SSLContext getSslContext(ClientConfiguration cfg) {
+        if (cfg.getSslMode() == SslMode.DISABLED)
+            return null;
+
+        Factory<SSLContext> sslCtxFactory = cfg.getSslContextFactory();
+
+        if (sslCtxFactory != null) {
+            try {
+                return sslCtxFactory.create();
+            }
+            catch (Exception e) {
+                throw new ClientError("SSL Context Factory failed", e);
+            }
+        }
+
+        BiFunction<String, String, String> or = (val, dflt) -> val == null || 
val.isEmpty() ? dflt : val;
+
+        String keyStore = or.apply(
+                cfg.getSslClientCertificateKeyStorePath(),
+                System.getProperty("javax.net.ssl.keyStore")
+        );
+
+        String keyStoreType = or.apply(
+                cfg.getSslClientCertificateKeyStoreType(),
+                or.apply(System.getProperty("javax.net.ssl.keyStoreType"), 
DFLT_STORE_TYPE)
+        );
+
+        String keyStorePwd = or.apply(
+                cfg.getSslClientCertificateKeyStorePassword(),
+                System.getProperty("javax.net.ssl.keyStorePassword")
+        );
+
+        String trustStore = or.apply(
+                cfg.getSslTrustCertificateKeyStorePath(),
+                System.getProperty("javax.net.ssl.trustStore")
+        );
+
+        String trustStoreType = or.apply(
+                cfg.getSslTrustCertificateKeyStoreType(),
+                or.apply(System.getProperty("javax.net.ssl.trustStoreType"), 
DFLT_STORE_TYPE)
+        );
+
+        String trustStorePwd = or.apply(
+                cfg.getSslTrustCertificateKeyStorePassword(),
+                System.getProperty("javax.net.ssl.trustStorePassword")
+        );
+
+        String algorithm = or.apply(cfg.getSslKeyAlgorithm(), 
DFLT_KEY_ALGORITHM);
+
+        String proto = toString(cfg.getSslProtocol());
+
+        if (Stream.of(keyStore, keyStorePwd, keyStoreType, trustStore, 
trustStorePwd, trustStoreType)
+                .allMatch(s -> s == null || s.isEmpty())
+        ) {
+            try {
+                return SSLContext.getDefault();
+            }
+            catch (NoSuchAlgorithmException e) {
+                throw new ClientError("Default SSL context cryptographic 
algorithm is not available", e);
+            }
+        }
+
+        KeyManager[] keyManagers = getKeyManagers(algorithm, keyStore, 
keyStoreType, keyStorePwd);
+
+        TrustManager[] trustManagers = cfg.isSslTrustAll() ?
+                new TrustManager[] {ignoreErrorsTrustMgr} :
+                getTrustManagers(algorithm, trustStore, trustStoreType, 
trustStorePwd);
+
+        try {
+            SSLContext sslCtx = SSLContext.getInstance(proto);
+
+            sslCtx.init(keyManagers, trustManagers, null);
+
+            return sslCtx;
+        }
+        catch (NoSuchAlgorithmException e) {
+            throw new ClientError("SSL context cryptographic algorithm is not 
available", e);
+        }
+        catch (KeyManagementException e) {
+            throw new ClientError("Failed to create SSL Context", e);
+        }
+    }
+
+    /**
+     * @return String representation of {@link SslProtocol} as required by 
{@link SSLContext}.
+     */
+    private static String toString(SslProtocol proto) {
+        switch (proto) {
+            case TLSv1_1:
+                return "TLSv1.1";
+
+            case TLSv1_2:
+                return "TLSv1.2";
+
+            default:
+                return proto.toString();
+        }
+    }
+
+    /** */
+    private static KeyManager[] getKeyManagers(
+            String algorithm,
+            String keyStore,
+            String keyStoreType,
+            String keyStorePwd
+    ) {
+        KeyManagerFactory keyMgrFactory;
+
+        try {
+            keyMgrFactory = KeyManagerFactory.getInstance(algorithm);
+        }
+        catch (NoSuchAlgorithmException e) {
+            throw new ClientError("Key manager cryptographic algorithm is not 
available", e);
+        }
+
+        Predicate<String> empty = s -> s == null || s.isEmpty();
+
+        if (!empty.test(keyStore) && !empty.test(keyStoreType)) {
+            char[] pwd = (keyStorePwd == null) ? EMPTY_CHARS : 
keyStorePwd.toCharArray();
+
+            KeyStore store = loadKeyStore("Client", keyStore, keyStoreType, 
pwd);
+
+            try {
+                keyMgrFactory.init(store, pwd);
+            }
+            catch (UnrecoverableKeyException e) {
+                throw new ClientError("Could not recover key store key", e);
+            }
+            catch (KeyStoreException e) {
+                throw new ClientError(
+                        String.format("Client key store provider of type [%s] 
is not available", keyStoreType),
+                        e
+                );
+            }
+            catch (NoSuchAlgorithmException e) {
+                throw new ClientError("Client key store integrity check 
algorithm is not available", e);
+            }
+        }
+
+        return keyMgrFactory.getKeyManagers();
+    }
+
+    /** */
+    private static TrustManager[] getTrustManagers(
+            String algorithm,
+            String trustStore,
+            String trustStoreType,
+            String trustStorePwd
+    ) {
+        TrustManagerFactory trustMgrFactory;
+
+        try {
+            trustMgrFactory = TrustManagerFactory.getInstance(algorithm);
+        }
+        catch (NoSuchAlgorithmException e) {
+            throw new ClientError("Trust manager cryptographic algorithm is 
not available", e);
+        }
+
+        Predicate<String> empty = s -> s == null || s.isEmpty();
+
+        if (!empty.test(trustStore) && !empty.test(trustStoreType)) {
+            char[] pwd = (trustStorePwd == null) ? EMPTY_CHARS : 
trustStorePwd.toCharArray();
+
+            KeyStore store = loadKeyStore("Trust", trustStore, trustStoreType, 
pwd);
+
+            try {
+                trustMgrFactory.init(store);
+            }
+            catch (KeyStoreException e) {
+                throw new ClientError(
+                        String.format("Trust key store provider of type [%s] 
is not available", trustStoreType),
+                        e
+                );
+            }
+        }
+
+        return trustMgrFactory.getTrustManagers();
+    }
+
+    /** */
+    private static KeyStore loadKeyStore(String lb, String path, String type, 
char[] pwd) {
+        KeyStore store;
+
+        try {
+            store = KeyStore.getInstance(type);
+        }
+        catch (KeyStoreException e) {
+            throw new ClientError(
+                    String.format("%s key store provider of type [%s] is not 
available", lb, type),
+                    e
+            );
+        }
+
+        try (InputStream in = new FileInputStream(new File(path))) {
+
+            store.load(in, pwd);
+
+            return store;
+        }
+        catch (FileNotFoundException e) {
+            throw new ClientError(String.format("%s key store file [%s] does 
not exist", lb, path), e);
+        }
+        catch (NoSuchAlgorithmException e) {
+            throw new ClientError(
+                    String.format("%s key store integrity check algorithm is 
not available", lb),
+                    e
+            );
+        }
+        catch (CertificateException e) {
+            throw new ClientError(String.format("Could not load certificate 
from %s key store", lb), e);
+        }
+        catch (IOException e) {
+            throw new ClientError(String.format("Could not read %s key store", 
lb), e);
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
index ae1b7fa..3aee483 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.client.thin;
 
+import java.nio.ByteBuffer;
+
 /**
  * Server to client notification listener.
  */
@@ -30,5 +32,5 @@ interface NotificationListener {
      * @param payload Notification payload or {@code null} if there is no 
payload.
      * @param err Error.
      */
-    public void acceptNotification(ClientChannel ch, ClientOperation op, long 
rsrcId, byte[] payload, Exception err);
+    public void acceptNotification(ClientChannel ch, ClientOperation op, long 
rsrcId, ByteBuffer payload, Exception err);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
index 76af7f2..f9d5978 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.client.thin;
 
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 
 /**
@@ -33,8 +35,8 @@ class PayloadInputChannel {
     /**
      * Constructor.
      */
-    PayloadInputChannel(ClientChannel ch, byte[] payload) {
-        in = new BinaryHeapInputStream(payload);
+    PayloadInputChannel(ClientChannel ch, ByteBuffer payload) {
+        in = BinaryByteBufferInputStream.create(payload);
         this.ch = ch;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index e7005be..195088d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.client.thin;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,13 +32,11 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -51,26 +50,21 @@ import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.IgniteClientFuture;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import 
org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientConnectionMultiplexer;
 import org.apache.ignite.internal.util.HostAndPortRange;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Communication channel with failover and partition awareness.
  */
 final class ReliableChannel implements AutoCloseable, NotificationListener {
-    /** Timeout to wait for executor service to shutdown (in milliseconds). */
-    private static final long EXECUTOR_SHUTDOWN_TIMEOUT = 10_000L;
-
     /** Do nothing helper function. */
     private static final Consumer<Integer> DO_NOTHING = (v) -> {};
 
-    /** Async runner thread name. */
-    static final String ASYNC_RUNNER_THREAD_NAME = 
"thin-client-channel-async-init";
-
     /** Channel factory. */
-    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory;
+    private final BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory;
 
     /** Client channel holders for each configured address. */
     private volatile List<ClientChannelHolder> channels;
@@ -96,19 +90,6 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
     /** Listeners of channel close events. */
     private final Collection<Consumer<ClientChannel>> channelCloseLsnrs = new 
CopyOnWriteArrayList<>();
 
-    /** Async tasks thread pool. */
-    private final ExecutorService asyncRunner = 
Executors.newSingleThreadExecutor(
-        new ThreadFactory() {
-            @Override public Thread newThread(@NotNull Runnable r) {
-                Thread thread = new Thread(r, ASYNC_RUNNER_THREAD_NAME);
-
-                thread.setDaemon(true);
-
-                return thread;
-            }
-        }
-    );
-
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
@@ -130,6 +111,9 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
     /** Guard channels and curChIdx together. */
     private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
 
+    /** Connection manager. */
+    private final ClientConnectionMultiplexer connMgr;
+
     /** Cache addresses returned by {@code ThinClientAddressFinder}. */
     private volatile String[] prevHostAddrs;
 
@@ -137,9 +121,9 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
      * Constructor.
      */
     ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
+            BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory,
+            ClientConfiguration clientCfg,
+            IgniteBinary binary
     ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
@@ -153,20 +137,16 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
         partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
         affinityCtx = new ClientCacheAffinityContext(binary);
+
+        connMgr = new GridNioClientConnectionMultiplexer(clientCfg);
+        connMgr.start();
     }
 
     /** {@inheritDoc} */
     @Override public synchronized void close() {
         closed = true;
 
-        asyncRunner.shutdown();
-
-        try {
-            asyncRunner.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS);
-        }
-        catch (InterruptedException ignore) {
-            // No-op.
-        }
+        connMgr.stop();
 
         List<ClientChannelHolder> holders = channels;
 
@@ -430,7 +410,7 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
         ClientChannel ch,
         ClientOperation op,
         long rsrcId,
-        byte[] payload,
+        ByteBuffer payload,
         Exception err
     ) {
         for (NotificationListener lsnr : notificationLsnrs) {
@@ -579,7 +559,7 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
      * Asynchronously try to establish a connection to all configured servers.
      */
     private void initAllChannelsAsync() {
-        asyncRunner.submit(
+        ForkJoinPool.commonPool().submit(
             () -> {
                 List<ClientChannelHolder> holders = channels;
 
@@ -608,7 +588,7 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
             if (scheduledChannelsReinit.compareAndSet(false, true)) {
                 // If partition awareness is disabled then only schedule and 
wait for the default channel to fail.
                 if (partitionAwarenessEnabled)
-                    asyncRunner.submit(this::channelsInit);
+                    ForkJoinPool.commonPool().submit(this::channelsInit);
             }
         }
     }
@@ -867,6 +847,7 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
     /**
      * Channels holder.
      */
+    @SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
     class ClientChannelHolder {
         /** Channel configuration. */
         private final ClientChannelConfiguration chCfg;
@@ -937,7 +918,7 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
                     if (!ignoreThrottling && applyReconnectionThrottling())
                         throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
 
-                    ClientChannel channel = chFactory.apply(chCfg);
+                    ClientChannel channel = chFactory.apply(chCfg, connMgr);
 
                     if (channel.serverNodeId() != null) {
                         
channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
@@ -1008,6 +989,7 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
     /**
      * Get holders reference. For test purposes.
      */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests.
     List<ClientChannelHolder> getChannelHolders() {
         return channels;
     }
@@ -1015,6 +997,7 @@ final class ReliableChannel implements AutoCloseable, 
NotificationListener {
     /**
      * Get node channels reference. For test purposes.
      */
+    @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests.
     Map<UUID, ClientChannelHolder> getNodeChannels() {
         return nodeChannels;
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 25df909..109c2a9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -17,21 +17,9 @@
 
 package org.apache.ignite.internal.client.thin;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -44,22 +32,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-import javax.cache.configuration.Factory;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509TrustManager;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.client.ClientAuthenticationException;
 import org.apache.ignite.client.ClientAuthorizationException;
@@ -67,19 +41,20 @@ import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
 import org.apache.ignite.client.ClientReconnectedException;
-import org.apache.ignite.client.SslMode;
-import org.apache.ignite.client.SslProtocol;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
 import org.apache.ignite.internal.binary.BinaryContext;
-import org.apache.ignite.internal.binary.BinaryPrimitives;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
@@ -103,19 +78,14 @@ import static 
org.apache.ignite.internal.client.thin.ProtocolVersion.V1_7_0;
 import static 
org.apache.ignite.internal.client.thin.ProtocolVersionFeature.AUTHORIZATION;
 import static 
org.apache.ignite.internal.client.thin.ProtocolVersionFeature.BITMAP_FEATURES;
 import static 
org.apache.ignite.internal.client.thin.ProtocolVersionFeature.PARTITION_AWARENESS;
-import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM;
-import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE;
 
 /**
  * Implements {@link ClientChannel} over TCP.
  */
-class TcpClientChannel implements ClientChannel {
+class TcpClientChannel implements ClientChannel, ClientMessageHandler, 
ClientConnectionStateHandler {
     /** Protocol version used by default on first connection attempt. */
     private static final ProtocolVersion DEFAULT_VERSION = LATEST_VER;
 
-    /** Receiver thread prefix. */
-    static final String RECEIVER_THREAD_PREFIX = "thin-client-channel#";
-
     /** Supported protocol versions. */
     private static final Collection<ProtocolVersion> supportedVers = 
Arrays.asList(
         V1_7_0,
@@ -128,30 +98,24 @@ class TcpClientChannel implements ClientChannel {
         V1_0_0
     );
 
+    /** Preallocated empty bytes. */
+    public static final byte[] EMPTY_BYTES = new byte[0];
+
     /** Protocol context. */
-    private ProtocolContext protocolCtx;
+    private volatile ProtocolContext protocolCtx;
 
     /** Server node ID. */
-    private UUID srvNodeId;
+    private volatile UUID srvNodeId;
 
     /** Server topology version. */
-    private AffinityTopologyVersion srvTopVer;
+    private volatile AffinityTopologyVersion srvTopVer;
 
     /** Channel. */
-    private final Socket sock;
-
-    /** Output stream. */
-    private final OutputStream out;
-
-    /** Data input. */
-    private final ByteCountingDataInput dataInput;
+    private final ClientConnection sock;
 
     /** Request id. */
     private final AtomicLong reqId = new AtomicLong(1);
 
-    /** Send lock. */
-    private final Lock sndLock = new ReentrantLock();
-
     /** Pending requests. */
     private final Map<Long, ClientRequestFuture> pendingReqs = new 
ConcurrentHashMap<>();
 
@@ -167,14 +131,11 @@ class TcpClientChannel implements ClientChannel {
     /** Executor for async operation listeners. */
     private final Executor asyncContinuationExecutor;
 
-    /** Receiver thread (processes incoming messages). */
-    private Thread receiverThread;
-
     /** Send/receive timeout in milliseconds. */
     private final int timeout;
 
     /** Constructor. */
-    TcpClientChannel(ClientChannelConfiguration cfg)
+    TcpClientChannel(ClientChannelConfiguration cfg, 
ClientConnectionMultiplexer connMgr)
         throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
         validateConfiguration(cfg);
 
@@ -183,21 +144,9 @@ class TcpClientChannel implements ClientChannel {
 
         timeout = cfg.getTimeout();
 
-        try {
-            sock = createSocket(cfg);
-
-            out = sock.getOutputStream();
-            dataInput = new ByteCountingDataInput(sock.getInputStream());
-
-            handshake(DEFAULT_VERSION, cfg.getUserName(), 
cfg.getUserPassword(), cfg.getUserAttributes());
+        sock = connMgr.open(cfg.getAddress(), this, this);
 
-            // Disable timeout on socket after handshake, instead, get future 
result with timeout in "receive" method.
-            if (timeout > 0)
-                sock.setSoTimeout(0);
-        }
-        catch (IOException e) {
-            throw handleIOError("addr=" + cfg.getAddress(), e);
-        }
+        handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), 
cfg.getUserAttributes());
     }
 
     /** {@inheritDoc} */
@@ -205,28 +154,25 @@ class TcpClientChannel implements ClientChannel {
         close(null);
     }
 
+    /** {@inheritDoc} */
+    @Override public void onMessage(ByteBuffer buf) {
+        processNextMessage(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(@Nullable Exception e) {
+        close(e);
+    }
+
     /**
      * Close the channel with cause.
      */
     private void close(Throwable cause) {
         if (closed.compareAndSet(false, true)) {
-            U.closeQuiet(dataInput);
-            U.closeQuiet(out);
             U.closeQuiet(sock);
 
-            sndLock.lock(); // Lock here to prevent creation of new pending 
requests.
-
-            try {
-                for (ClientRequestFuture pendingReq : pendingReqs.values())
-                    pendingReq.onDone(new ClientConnectionException("Channel 
is closed", cause));
-
-                if (receiverThread != null)
-                    receiverThread.interrupt();
-            }
-            finally {
-                sndLock.unlock();
-            }
-
+            for (ClientRequestFuture pendingReq : pendingReqs.values())
+                pendingReq.onDone(new ClientConnectionException("Channel is 
closed", cause));
         }
     }
 
@@ -251,7 +197,8 @@ class TcpClientChannel implements ClientChannel {
             ClientRequestFuture fut = send(op, payloadWriter);
 
             return receiveAsync(fut, payloadReader);
-        } catch (Throwable t) {
+        }
+        catch (Throwable t) {
             CompletableFuture<T> fut = new CompletableFuture<>();
             fut.completeExceptionally(t);
 
@@ -268,15 +215,10 @@ class TcpClientChannel implements ClientChannel {
         throws ClientException {
         long id = reqId.getAndIncrement();
 
-        // Only one thread at a time can have access to write to the channel.
-        sndLock.lock();
-
         try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
             if (closed())
                 throw new ClientConnectionException("Channel is closed");
 
-            initReceiverThread(); // Start the receiver thread with the first 
request.
-
             ClientRequestFuture fut = new ClientRequestFuture();
 
             pendingReqs.put(id, fut);
@@ -292,7 +234,8 @@ class TcpClientChannel implements ClientChannel {
 
             req.writeInt(0, req.position() - 4); // Actual size.
 
-            write(req.array(), req.position());
+            // arrayCopy is required, because buffer is pooled, and write is 
async.
+            write(req.arrayCopy(), req.position());
 
             return fut;
         }
@@ -301,9 +244,6 @@ class TcpClientChannel implements ClientChannel {
 
             throw t;
         }
-        finally {
-            sndLock.unlock();
-        }
     }
 
     /**
@@ -314,7 +254,7 @@ class TcpClientChannel implements ClientChannel {
     private <T> T receive(ClientRequestFuture pendingReq, 
Function<PayloadInputChannel, T> payloadReader)
         throws ClientException {
         try {
-            byte[] payload = timeout > 0 ? pendingReq.get(timeout) : 
pendingReq.get();
+            ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : 
pendingReq.get();
 
             if (payload == null || payloadReader == null)
                 return null;
@@ -338,7 +278,7 @@ class TcpClientChannel implements ClientChannel {
 
         pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(() 
-> {
             try {
-                byte[] payload = payloadFut.get();
+                ByteBuffer payload = payloadFut.get();
 
                 if (payload == null || payloadReader == null)
                     fut.complete(null);
@@ -346,7 +286,8 @@ class TcpClientChannel implements ClientChannel {
                     T res = payloadReader.apply(new PayloadInputChannel(this, 
payload));
                     fut.complete(res);
                 }
-            } catch (Throwable t) {
+            }
+            catch (Throwable t) {
                 fut.completeExceptionally(convertException(t));
             }
         }));
@@ -389,58 +330,29 @@ class TcpClientChannel implements ClientChannel {
     }
 
     /**
-     * Init and start receiver thread if it wasn't started before.
-     *
-     * Note: Method should be called only under external synchronization.
-     */
-    private void initReceiverThread() {
-        if (receiverThread == null) {
-            Socket sock = this.sock;
-
-            String sockInfo = sock == null ? null : 
sock.getInetAddress().getHostName() + ":" + sock.getPort();
-
-            receiverThread = new Thread(() -> {
-                try {
-                    while (!closed())
-                        processNextMessage();
-                }
-                catch (Throwable e) {
-                    close(e);
-                }
-            }, RECEIVER_THREAD_PREFIX + sockInfo);
-
-            receiverThread.setDaemon(true);
-
-            receiverThread.start();
-        }
-    }
-
-    /**
      * Process next message from the input stream and complete corresponding 
future.
      */
-    private void processNextMessage() throws ClientProtocolError, 
ClientConnectionException {
-        // blocking read a message header not to fall into a busy loop
-        int msgSize = dataInput.readInt(2048);
-
-        if (msgSize <= 0)
-            throw new ClientProtocolError(String.format("Invalid message size: 
%s", msgSize));
+    private void processNextMessage(ByteBuffer buf) throws 
ClientProtocolError, ClientConnectionException {
+        BinaryInputStream dataInput = BinaryByteBufferInputStream.create(buf);
 
-        long bytesReadOnStartMsg = dataInput.totalBytesRead();
+        if (protocolCtx == null) {
+            // Process handshake.
+            pendingReqs.remove(-1L).onDone(buf);
+            return;
+        }
 
-        long resId = dataInput.spinReadLong();
+        long resId = dataInput.readLong();
 
         int status = 0;
 
         ClientOperation notificationOp = null;
 
-        BinaryInputStream resIn;
-
         if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
-            short flags = dataInput.spinReadShort();
+            short flags = dataInput.readShort();
 
             if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
-                long topVer = dataInput.spinReadLong();
-                int minorTopVer = dataInput.spinReadInt();
+                long topVer = dataInput.readLong();
+                int minorTopVer = dataInput.readInt();
 
                 srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
 
@@ -449,7 +361,7 @@ class TcpClientChannel implements ClientChannel {
             }
 
             if ((flags & ClientFlag.NOTIFICATION) != 0) {
-                short notificationCode = dataInput.spinReadShort();
+                short notificationCode = dataInput.readShort();
 
                 notificationOp = ClientOperation.fromCode(notificationCode);
 
@@ -458,28 +370,25 @@ class TcpClientChannel implements ClientChannel {
             }
 
             if ((flags & ClientFlag.ERROR) != 0)
-                status = dataInput.spinReadInt();
+                status = dataInput.readInt();
         }
         else
-            status = dataInput.spinReadInt();
+            status = dataInput.readInt();
 
-        int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
+        int hdrSize = dataInput.position();
+        int msgSize = buf.limit();
 
-        byte[] res = null;
+        ByteBuffer res = null;
         Exception err = null;
 
         if (status == 0) {
             if (msgSize > hdrSize)
-                res = dataInput.spinRead(msgSize - hdrSize);
+                res = buf;
         }
-        else if (status == ClientStatus.SECURITY_VIOLATION) {
-            dataInput.spinRead(msgSize - hdrSize); // Read message to the end.
-
+        else if (status == ClientStatus.SECURITY_VIOLATION)
             err = new ClientAuthorizationException();
-        } else {
-            resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - 
hdrSize));
-
-            String errMsg = ClientUtils.createBinaryReader(null, 
resIn).readString();
+        else {
+            String errMsg = ClientUtils.createBinaryReader(null, 
dataInput).readString();
 
             err = new ClientServerError(errMsg, status, resId);
         }
@@ -543,31 +452,21 @@ class TcpClientChannel implements ClientChannel {
             throw new IllegalArgumentException(error);
     }
 
-    /** Create socket. */
-    private static Socket createSocket(ClientChannelConfiguration cfg) throws 
IOException {
-        Socket sock = cfg.getSslMode() == SslMode.REQUIRED ?
-            new ClientSslSocketFactory(cfg).create() :
-            new Socket(cfg.getAddress().getHostName(), 
cfg.getAddress().getPort());
-
-        sock.setTcpNoDelay(cfg.isTcpNoDelay());
-
-        if (cfg.getTimeout() > 0)
-            sock.setSoTimeout(cfg.getTimeout());
-
-        if (cfg.getSendBufferSize() > 0)
-            sock.setSendBufferSize(cfg.getSendBufferSize());
-
-        if (cfg.getReceiveBufferSize() > 0)
-            sock.setReceiveBufferSize(cfg.getReceiveBufferSize());
-
-        return sock;
-    }
-
     /** Client handshake. */
     private void handshake(ProtocolVersion ver, String user, String pwd, 
Map<String, String> userAttrs)
         throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
+        ClientRequestFuture fut = new ClientRequestFuture();
+        pendingReqs.put(-1L, fut);
+
         handshakeReq(ver, user, pwd, userAttrs);
-        handshakeRes(ver, user, pwd, userAttrs);
+
+        try {
+            ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get();
+            handshakeRes(res, ver, user, pwd, userAttrs);
+        }
+        catch (IgniteCheckedException e) {
+            throw new ClientConnectionException(e.getMessage(), e);
+        }
     }
 
     /** Send handshake request. */
@@ -604,7 +503,7 @@ class TcpClientChannel implements ClientChannel {
 
             writer.out().writeInt(0, writer.out().position() - 4);// actual 
size
 
-            write(writer.array(), writer.out().position());
+            write(writer.out().arrayCopy(), writer.out().position());
         }
     }
 
@@ -621,20 +520,15 @@ class TcpClientChannel implements ClientChannel {
     }
 
     /** Receive and handle handshake response. */
-    private void handshakeRes(ProtocolVersion proposedVer, String user, String 
pwd, Map<String, String> userAttrs)
+    private void handshakeRes(ByteBuffer buf, ProtocolVersion proposedVer, 
String user, String pwd, Map<String, String> userAttrs)
         throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
-        int resSize = dataInput.readInt();
-
-        if (resSize <= 0)
-            throw new ClientProtocolError(String.format("Invalid handshake 
response size: %s", resSize));
-
-        BinaryInputStream res = new 
BinaryHeapInputStream(dataInput.read(resSize));
+        BinaryInputStream res = BinaryByteBufferInputStream.create(buf);
 
         try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, 
res)) {
             boolean success = res.readBoolean();
 
             if (success) {
-                byte[] features = new byte[0];
+                byte[] features = EMPTY_BYTES;
 
                 if (ProtocolContext.isFeatureSupported(proposedVer, 
BITMAP_FEATURES))
                     features = reader.readByteArray();
@@ -680,12 +574,13 @@ class TcpClientChannel implements ClientChannel {
 
     /** Write bytes to the output stream. */
     private void write(byte[] bytes, int len) throws ClientConnectionException 
{
+        ByteBuffer buf = ByteBuffer.wrap(bytes, 0, len);
+
         try {
-            out.write(bytes, 0, len);
-            out.flush();
+            sock.send(buf);
         }
-        catch (IOException e) {
-            throw handleIOError(e);
+        catch (IgniteCheckedException e) {
+            throw new ClientConnectionException(e.getMessage(), e);
         }
     }
 
@@ -705,424 +600,8 @@ class TcpClientChannel implements ClientChannel {
     }
 
     /**
-     * Auxiliary class to read byte buffers and numeric values, counting total 
bytes read.
-     * Numeric values are read in the little-endian byte order.
-     */
-    private class ByteCountingDataInput implements AutoCloseable {
-        /** Input stream. */
-        private final InputStream in;
-
-        /** Total bytes read from the input stream. */
-        private long totalBytesRead;
-
-        /** Temporary buffer to read long, int and short values. */
-        private final byte[] tmpBuf = new byte[Long.BYTES];
-
-        /**
-         * @param in Input stream.
-         */
-        public ByteCountingDataInput(InputStream in) {
-            this.in = in;
-        }
-
-        /** Read bytes from the input stream. */
-        public byte[] read(int len) throws ClientConnectionException {
-            byte[] bytes = new byte[len];
-
-            read(bytes, len, 0);
-
-            return bytes;
-        }
-
-        /** Read bytes from the input stream. */
-        public byte[] spinRead(int len) {
-            byte[] bytes = new byte[len];
-
-            read(bytes, len, Integer.MAX_VALUE);
-
-            return bytes;
-        }
-
-        /**
-         * Read bytes from the input stream to the buffer.
-         *
-         * @param bytes Bytes buffer.
-         * @param len Length.
-         * @param tryReadCnt Number of reads before falling into blocking read.
-         */
-        public void read(byte[] bytes, int len, int tryReadCnt) throws 
ClientConnectionException {
-            int offset = 0;
-
-            try {
-                while (offset < len) {
-                    int toRead;
-
-                    if (tryReadCnt == 0)
-                        toRead = len - offset;
-                    else if ((toRead = Math.min(in.available(), len - offset)) 
== 0) {
-                        tryReadCnt--;
-
-                        continue;
-                    }
-
-                    int read = in.read(bytes, offset, toRead);
-
-                    if (read < 0)
-                        throw handleIOError(null);
-
-                    offset += read;
-                    totalBytesRead += read;
-                }
-            }
-            catch (IOException e) {
-                throw handleIOError(e);
-            }
-        }
-
-        /**
-         * Read long value from the input stream.
-         */
-        public long readLong() throws ClientConnectionException {
-            return readLong(0);
-        }
-
-        /**
-         * Read long value from the input stream.
-         */
-        public long spinReadLong() throws ClientConnectionException {
-            return readLong(Integer.MAX_VALUE);
-        }
-
-        /**
-         * Read long value from the input stream.
-         *
-         * @param tryReadCnt Number of reads before falling into blocking read.
-         */
-        private long readLong(int tryReadCnt) throws ClientConnectionException 
{
-            read(tmpBuf, Long.BYTES, tryReadCnt);
-
-            return BinaryPrimitives.readLong(tmpBuf, 0);
-        }
-
-        /**
-         * Read int value from the input stream.
-         */
-        public int readInt() throws ClientConnectionException {
-            return readInt(0);
-        }
-
-        /**
-         * Read int value from the input stream.
-         */
-        public int spinReadInt() throws ClientConnectionException {
-            return readInt(Integer.MAX_VALUE);
-        }
-
-        /**
-         * Read int value from the input stream.
-         *
-         * @param tryReadCnt Number of reads before falling into blocking read.
-         */
-        private int readInt(int tryReadCnt) throws ClientConnectionException {
-            read(tmpBuf, Integer.BYTES, tryReadCnt);
-
-            return BinaryPrimitives.readInt(tmpBuf, 0);
-        }
-
-        /**
-         * Read short value from the input stream.
-         */
-        public short readShort() throws ClientConnectionException {
-            return readShort(0);
-        }
-
-        /**
-         * Read short value from the input stream.
-         */
-        public short spinReadShort() throws ClientConnectionException {
-            return readShort(Integer.MAX_VALUE);
-        }
-
-        /**
-         * Read short value from the input stream.
-         *
-         * @param tryReadCnt Number of reads before falling into blocking read.
-         */
-        public short readShort(int tryReadCnt) throws 
ClientConnectionException {
-            read(tmpBuf, Short.BYTES, tryReadCnt);
-
-            return BinaryPrimitives.readShort(tmpBuf, 0);
-        }
-
-        /**
-         * Gets total bytes read from the input stream.
-         */
-        public long totalBytesRead() {
-            return totalBytesRead;
-        }
-
-        /**
-         * Close input stream.
-         */
-        @Override public void close() throws IOException {
-            in.close();
-        }
-    }
-
-    /**
      *
      */
-    private static class ClientRequestFuture extends GridFutureAdapter<byte[]> 
{
-    }
-
-    /** SSL Socket Factory. */
-    private static class ClientSslSocketFactory {
-        /** Trust manager ignoring all certificate checks. */
-        private static final TrustManager ignoreErrorsTrustMgr = new 
X509TrustManager() {
-            @Override public X509Certificate[] getAcceptedIssuers() {
-                return null;
-            }
-
-            @Override public void checkServerTrusted(X509Certificate[] arg0, 
String arg1) {
-            }
-
-            @Override public void checkClientTrusted(X509Certificate[] arg0, 
String arg1) {
-            }
-        };
-
-        /** Config. */
-        private final ClientChannelConfiguration cfg;
-
-        /** Constructor. */
-        ClientSslSocketFactory(ClientChannelConfiguration cfg) {
-            this.cfg = cfg;
-        }
-
-        /** Create SSL socket. */
-        SSLSocket create() throws IOException {
-            InetSocketAddress addr = cfg.getAddress();
-
-            SSLSocket sock = 
(SSLSocket)getSslSocketFactory(cfg).createSocket(addr.getHostName(), 
addr.getPort());
-
-            sock.setUseClientMode(true);
-
-            sock.startHandshake();
-
-            return sock;
-        }
-
-        /** Create SSL socket factory. */
-        private static SSLSocketFactory 
getSslSocketFactory(ClientChannelConfiguration cfg) {
-            Factory<SSLContext> sslCtxFactory = cfg.getSslContextFactory();
-
-            if (sslCtxFactory != null) {
-                try {
-                    return sslCtxFactory.create().getSocketFactory();
-                }
-                catch (Exception e) {
-                    throw new ClientError("SSL Context Factory failed", e);
-                }
-            }
-
-            BiFunction<String, String, String> or = (val, dflt) -> val == null 
|| val.isEmpty() ? dflt : val;
-
-            String keyStore = or.apply(
-                cfg.getSslClientCertificateKeyStorePath(),
-                System.getProperty("javax.net.ssl.keyStore")
-            );
-
-            String keyStoreType = or.apply(
-                cfg.getSslClientCertificateKeyStoreType(),
-                or.apply(System.getProperty("javax.net.ssl.keyStoreType"), 
DFLT_STORE_TYPE)
-            );
-
-            String keyStorePwd = or.apply(
-                cfg.getSslClientCertificateKeyStorePassword(),
-                System.getProperty("javax.net.ssl.keyStorePassword")
-            );
-
-            String trustStore = or.apply(
-                cfg.getSslTrustCertificateKeyStorePath(),
-                System.getProperty("javax.net.ssl.trustStore")
-            );
-
-            String trustStoreType = or.apply(
-                cfg.getSslTrustCertificateKeyStoreType(),
-                or.apply(System.getProperty("javax.net.ssl.trustStoreType"), 
DFLT_STORE_TYPE)
-            );
-
-            String trustStorePwd = or.apply(
-                cfg.getSslTrustCertificateKeyStorePassword(),
-                System.getProperty("javax.net.ssl.trustStorePassword")
-            );
-
-            String algorithm = or.apply(cfg.getSslKeyAlgorithm(), 
DFLT_KEY_ALGORITHM);
-
-            String proto = toString(cfg.getSslProtocol());
-
-            if (Stream.of(keyStore, keyStorePwd, keyStoreType, trustStore, 
trustStorePwd, trustStoreType)
-                .allMatch(s -> s == null || s.isEmpty())
-                ) {
-                try {
-                    return SSLContext.getDefault().getSocketFactory();
-                }
-                catch (NoSuchAlgorithmException e) {
-                    throw new ClientError("Default SSL context cryptographic 
algorithm is not available", e);
-                }
-            }
-
-            KeyManager[] keyManagers = getKeyManagers(algorithm, keyStore, 
keyStoreType, keyStorePwd);
-
-            TrustManager[] trustManagers = cfg.isSslTrustAll() ?
-                new TrustManager[] {ignoreErrorsTrustMgr} :
-                getTrustManagers(algorithm, trustStore, trustStoreType, 
trustStorePwd);
-
-            try {
-                SSLContext sslCtx = SSLContext.getInstance(proto);
-
-                sslCtx.init(keyManagers, trustManagers, null);
-
-                return sslCtx.getSocketFactory();
-            }
-            catch (NoSuchAlgorithmException e) {
-                throw new ClientError("SSL context cryptographic algorithm is 
not available", e);
-            }
-            catch (KeyManagementException e) {
-                throw new ClientError("Failed to create SSL Context", e);
-            }
-        }
-
-        /**
-         * @return String representation of {@link SslProtocol} as required by 
{@link SSLContext}.
-         */
-        private static String toString(SslProtocol proto) {
-            switch (proto) {
-                case TLSv1_1:
-                    return "TLSv1.1";
-
-                case TLSv1_2:
-                    return "TLSv1.2";
-
-                default:
-                    return proto.toString();
-            }
-        }
-
-        /** */
-        private static KeyManager[] getKeyManagers(
-            String algorithm,
-            String keyStore,
-            String keyStoreType,
-            String keyStorePwd
-        ) {
-            KeyManagerFactory keyMgrFactory;
-
-            try {
-                keyMgrFactory = KeyManagerFactory.getInstance(algorithm);
-            }
-            catch (NoSuchAlgorithmException e) {
-                throw new ClientError("Key manager cryptographic algorithm is 
not available", e);
-            }
-
-            Predicate<String> empty = s -> s == null || s.isEmpty();
-
-            if (!empty.test(keyStore) && !empty.test(keyStoreType)) {
-                char[] pwd = (keyStorePwd == null) ? new char[0] : 
keyStorePwd.toCharArray();
-
-                KeyStore store = loadKeyStore("Client", keyStore, 
keyStoreType, pwd);
-
-                try {
-                    keyMgrFactory.init(store, pwd);
-                }
-                catch (UnrecoverableKeyException e) {
-                    throw new ClientError("Could not recover key store key", 
e);
-                }
-                catch (KeyStoreException e) {
-                    throw new ClientError(
-                        String.format("Client key store provider of type [%s] 
is not available", keyStoreType),
-                        e
-                    );
-                }
-                catch (NoSuchAlgorithmException e) {
-                    throw new ClientError("Client key store integrity check 
algorithm is not available", e);
-                }
-            }
-
-            return keyMgrFactory.getKeyManagers();
-        }
-
-        /** */
-        private static TrustManager[] getTrustManagers(
-            String algorithm,
-            String trustStore,
-            String trustStoreType,
-            String trustStorePwd
-        ) {
-            TrustManagerFactory trustMgrFactory;
-
-            try {
-                trustMgrFactory = TrustManagerFactory.getInstance(algorithm);
-            }
-            catch (NoSuchAlgorithmException e) {
-                throw new ClientError("Trust manager cryptographic algorithm 
is not available", e);
-            }
-
-            Predicate<String> empty = s -> s == null || s.isEmpty();
-
-            if (!empty.test(trustStore) && !empty.test(trustStoreType)) {
-                char[] pwd = (trustStorePwd == null) ? new char[0] : 
trustStorePwd.toCharArray();
-
-                KeyStore store = loadKeyStore("Trust", trustStore, 
trustStoreType, pwd);
-
-                try {
-                    trustMgrFactory.init(store);
-                }
-                catch (KeyStoreException e) {
-                    throw new ClientError(
-                        String.format("Trust key store provider of type [%s] 
is not available", trustStoreType),
-                        e
-                    );
-                }
-            }
-
-            return trustMgrFactory.getTrustManagers();
-        }
-
-        /** */
-        private static KeyStore loadKeyStore(String lb, String path, String 
type, char[] pwd) {
-            KeyStore store;
-
-            try {
-                store = KeyStore.getInstance(type);
-            }
-            catch (KeyStoreException e) {
-                throw new ClientError(
-                    String.format("%s key store provider of type [%s] is not 
available", lb, type),
-                    e
-                );
-            }
-
-            try (InputStream in = new FileInputStream(new File(path))) {
-
-                store.load(in, pwd);
-
-                return store;
-            }
-            catch (FileNotFoundException e) {
-                throw new ClientError(String.format("%s key store file [%s] 
does not exist", lb, path), e);
-            }
-            catch (NoSuchAlgorithmException e) {
-                throw new ClientError(
-                    String.format("%s key store integrity check algorithm is 
not available", lb),
-                    e
-                );
-            }
-            catch (CertificateException e) {
-                throw new ClientError(String.format("Could not load 
certificate from %s key store", lb), e);
-            }
-            catch (IOException e) {
-                throw new ClientError(String.format("Could not read %s key 
store", lb), e);
-            }
-        }
+    private static class ClientRequestFuture extends 
GridFutureAdapter<ByteBuffer> {
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 9cea6a4..c67184a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -24,8 +24,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObjectException;
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.MarshallerContext;
@@ -101,8 +102,8 @@ public class TcpIgniteClient implements IgniteClient {
      * Constructor with custom channel factory.
      */
     TcpIgniteClient(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration cfg
+            BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory,
+            ClientConfiguration cfg
     ) throws ClientException {
         final ClientBinaryMetadataHandler metadataHandler = new 
ClientBinaryMetadataHandler();
 
@@ -116,18 +117,24 @@ public class TcpIgniteClient implements IgniteClient {
 
         ch = new ReliableChannel(chFactory, cfg, binary);
 
-        ch.channelsInit();
+        try {
+            ch.channelsInit();
 
-        ch.addChannelFailListener(() -> metadataHandler.onReconnect());
+            ch.addChannelFailListener(() -> metadataHandler.onReconnect());
 
-        transactions = new TcpClientTransactions(ch, marsh,
-            new 
ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
+            transactions = new TcpClientTransactions(ch, marsh,
+                    new 
ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
 
-        cluster = new ClientClusterImpl(ch, marsh);
+            cluster = new ClientClusterImpl(ch, marsh);
 
-        compute = new ClientComputeImpl(ch, marsh, 
cluster.defaultClusterGroup());
+            compute = new ClientComputeImpl(ch, marsh, 
cluster.defaultClusterGroup());
 
-        services = new ClientServicesImpl(ch, marsh, 
cluster.defaultClusterGroup());
+            services = new ClientServicesImpl(ch, marsh, 
cluster.defaultClusterGroup());
+        }
+        catch (Exception e) {
+            ch.close();
+            throw e;
+        }
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
similarity index 62%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
index ae1b7fa..eed90b6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
@@ -15,20 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.IgniteCheckedException;
 
 /**
- * Server to client notification listener.
+ * Client connection: abstracts away sending and receiving messages.
  */
-interface NotificationListener {
+public interface ClientConnection extends AutoCloseable {
     /**
-     * Accept notification.
+     * Sends a message.
      *
-     * @param ch Client channel which was notified.
-     * @param op Client operation.
-     * @param rsrcId Resource id.
-     * @param payload Notification payload or {@code null} if there is no 
payload.
-     * @param err Error.
+     * @param msg Message buffer.
+     */
+    void send(ByteBuffer msg) throws IgniteCheckedException;
+
+    /**
+     * Closes the connection.
      */
-    public void acceptNotification(ClientChannel ch, ClientOperation op, long 
rsrcId, byte[] payload, Exception err);
+    @Override void close();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java
new file mode 100644
index 0000000..891e2b3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import java.net.InetSocketAddress;
+
+import org.apache.ignite.client.ClientConnectionException;
+
+/**
+ * Client connection multiplexer: manages multiple connections with a shared 
resource pool (worker threads, etc).
+ */
+public interface ClientConnectionMultiplexer {
+    /**
+     * Initializes this instance.
+     */
+    void start();
+
+    /**
+     * Stops this instance.
+     */
+    void stop();
+
+    /**
+     * Opens a new connection.
+     *
+     * @param addr Address.
+     * @param msgHnd Incoming message handler.
+     * @param stateHnd Connection state handler.
+     * @return Created connection.
+     * @throws ClientConnectionException when connection can't be established.
+     */
+    ClientConnection open(
+            InetSocketAddress addr,
+            ClientMessageHandler msgHnd,
+            ClientConnectionStateHandler stateHnd)
+            throws ClientConnectionException;
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
similarity index 62%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
index ae1b7fa..3f9481e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.internal.client.thin.io;
+
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Server to client notification listener.
+ * Handles thin client connection state.
  */
-interface NotificationListener {
+public interface ClientConnectionStateHandler {
     /**
-     * Accept notification.
-     *
-     * @param ch Client channel which was notified.
-     * @param op Client operation.
-     * @param rsrcId Resource id.
-     * @param payload Notification payload or {@code null} if there is no 
payload.
-     * @param err Error.
+     * Handles connection loss.
+     * @param e Exception that caused the disconnect, can be null.
      */
-    public void acceptNotification(ClientChannel ch, ClientOperation op, long 
rsrcId, byte[] payload, Exception err);
+    void onDisconnected(@Nullable Exception e);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java
new file mode 100644
index 0000000..06ab441
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Decodes thin client messages from partial buffers.
+ */
+public class ClientMessageDecoder {
+    /** */
+    private byte[] data;
+
+    /** */
+    private int cnt = -4;
+
+    /** */
+    private int msgSize;
+
+    /**
+     * Applies the next partial buffer.
+     *
+     * @param buf Buffer.
+     * @return Decoded message, or null when not yet complete.
+     */
+    public byte[] apply(ByteBuffer buf) {
+        boolean msgReady = read(buf);
+
+        return msgReady ? data : null;
+    }
+
+    /**
+     * Reads the buffer.
+     *
+     * @param buf Buffer.
+     * @return True when a complete message has been received; false otherwise.
+     */
+    @SuppressWarnings("DuplicatedCode") // A little duplication is better than 
a little dependency.
+    private boolean read(ByteBuffer buf) {
+        if (cnt < 0) {
+            for (; cnt < 0 && buf.hasRemaining(); cnt++)
+                msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt));
+
+            if (cnt < 0)
+                return false;
+
+            data = new byte[msgSize];
+        }
+
+        assert data != null;
+        assert cnt >= 0;
+        assert msgSize > 0;
+
+        int remaining = buf.remaining();
+
+        if (remaining > 0) {
+            int missing = msgSize - cnt;
+
+            if (missing > 0) {
+                int len = Math.min(missing, remaining);
+
+                buf.get(data, cnt, len);
+
+                cnt += len;
+            }
+        }
+
+        if (cnt == msgSize) {
+            cnt = -4;
+            msgSize = 0;
+
+            return true;
+        }
+
+        return false;
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java
similarity index 62%
copy from 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java
index ae1b7fa..a52859f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java
@@ -15,20 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
 
 /**
- * Server to client notification listener.
+ * Handles thin client responses and server -> client notifications.
  */
-interface NotificationListener {
+public interface ClientMessageHandler {
     /**
-     * Accept notification.
-     *
-     * @param ch Client channel which was notified.
-     * @param op Client operation.
-     * @param rsrcId Resource id.
-     * @param payload Notification payload or {@code null} if there is no 
payload.
-     * @param err Error.
+     * Handles messages from the server.
+     * @param buf Buffer.
      */
-    public void acceptNotification(ClientChannel ch, ClientOperation op, long 
rsrcId, byte[] payload, Exception err);
+    void onMessage(ByteBuffer buf);
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
new file mode 100644
index 0000000..e81d6f4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+
+/**
+ * Client connection.
+ */
+class GridNioClientConnection implements ClientConnection {
+    /** */
+    static final int SES_META_CONN = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** */
+    private final GridNioSession ses;
+
+    /** */
+    private final ClientMessageHandler msgHnd;
+
+    /** */
+    private final ClientConnectionStateHandler stateHnd;
+
+    /**
+     * Ctor.
+     *
+     * @param ses Session.
+     */
+    public GridNioClientConnection(GridNioSession ses,
+                                   ClientMessageHandler msgHnd,
+                                   ClientConnectionStateHandler stateHnd) {
+        assert ses != null;
+        assert msgHnd != null;
+        assert stateHnd != null;
+
+        this.ses = ses;
+        this.msgHnd = msgHnd;
+        this.stateHnd = stateHnd;
+
+        ses.addMeta(SES_META_CONN, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void send(ByteBuffer msg) throws IgniteCheckedException {
+        ses.sendNoFuture(msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        ses.close();
+    }
+
+    /**
+     * Handles incoming message.
+     *
+     * @param msg Message.
+     */
+    void onMessage(ByteBuffer msg) {
+        assert msg != null;
+
+        msgHnd.onMessage(msg);
+    }
+
+    /**
+     * Handles disconnect.
+     *
+     * @param e Exception that caused the disconnect.
+     */
+    void onDisconnected(Exception e) {
+        stateHnd.onDisconnected(e);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
new file mode 100644
index 0000000..74a7025
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link 
org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements 
ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new 
GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, 
ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[] {codecFilter, sslFilter};
+        }
+        else
+            filters = new GridNioFilter[] {codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioClientListener())
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // Using more selectors does not seem to 
improve performance.
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .directBuffer(true)
+                    .directMode(false)
+                    .igniteInstanceName("thinClient")
+                    .serverName(THREAD_PREFIX)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+                    .socketSendBufferSize(cfg.getSendBufferSize())
+                    .tcpNoDelay(true)
+                    .build();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        srv.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        srv.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientConnection open(InetSocketAddress addr,
+                                           ClientMessageHandler msgHnd,
+                                           ClientConnectionStateHandler 
stateHnd)
+            throws ClientConnectionException {
+        try {
+            SocketChannel ch = SocketChannel.open();
+            ch.socket().connect(new InetSocketAddress(addr.getHostName(), 
addr.getPort()), Integer.MAX_VALUE);
+
+            Map<Integer, Object> meta = new HashMap<>();
+            GridNioFuture<?> sslHandshakeFut = null;
+
+            if (sslCtx != null) {
+                sslHandshakeFut = new GridNioFutureImpl<>(null);
+
+                meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, 
sslHandshakeFut);
+            }
+
+            GridNioSession ses = srv.createSession(ch, meta, false, 
null).get();
+
+            if (sslHandshakeFut != null)
+                sslHandshakeFut.get();
+
+            return new GridNioClientConnection(ses, msgHnd, stateHnd);
+        }
+        catch (Exception e) {
+            throw new ClientConnectionException(e.getMessage(), e);
+        }
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java
new file mode 100644
index 0000000..f33835d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client event listener.
+ */
+class GridNioClientListener implements GridNioServerListener<ByteBuffer> {
+    /** {@inheritDoc} */
+    @Override public void onConnected(GridNioSession ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
+        GridNioClientConnection conn = 
ses.meta(GridNioClientConnection.SES_META_CONN);
+
+        // Conn can be null when connection fails during initialization in 
open method.
+        if (conn != null)
+            conn.onDisconnected(e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(GridNioSession ses, ByteBuffer msg) {
+        GridNioClientConnection conn = 
ses.meta(GridNioClientConnection.SES_META_CONN);
+
+        assert conn != null : "Session must have an associated connection";
+
+        conn.onMessage(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionWriteTimeout(GridNioSession ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionIdleTimeout(GridNioSession ses) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onFailure(FailureType failureType, Throwable 
failure) {
+        // No-op.
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
new file mode 100644
index 0000000..439c78a
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client message parser.
+ */
+class GridNioClientParser implements GridNioParser {
+    /** */
+    private static final int SES_META_DECODER = 
GridNioSessionMetaKey.nextUniqueKey();
+
+    /** {@inheritDoc} */
+    @Override public @Nullable Object decode(GridNioSession ses, ByteBuffer 
buf) {
+        ClientMessageDecoder decoder = ses.meta(SES_META_DECODER);
+
+        if (decoder == null) {
+            decoder = new ClientMessageDecoder();
+
+            ses.addMeta(SES_META_DECODER, decoder);
+        }
+
+        byte[] bytes = decoder.apply(buf);
+
+        if (bytes == null)
+            return null; // Message is not yet completely received.
+
+        // Thin client protocol is little-endian. ByteBuffer will handle 
conversion as necessary on big-endian systems.
+        return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) {
+        return (ByteBuffer)msg;
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
index bbb2c87..2d75d5c 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
@@ -71,14 +71,20 @@ public class ConnectToStartingNodeTest extends 
AbstractThinClientTest {
         IgniteInternalFuture<IgniteClient> futStartClient = 
GridTestUtils.runAsync(
             () -> startClient(grid()));
 
-        // Server doesn't accept connection before discovery SPI started.
-        assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 
500L));
+        try {
+            // Server doesn't accept connection before discovery SPI started.
+            assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 
500L));
 
-        barrier.await();
+            barrier.await();
 
-        futStartGrid.get();
+            futStartGrid.get();
 
-        // Server accept connection after discovery SPI started.
-        assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 
500L));
+            // Server accept connection after discovery SPI started.
+            assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 
500L));
+        }
+        finally {
+            if (futStartClient.isDone())
+                futStartClient.get().close();
+        }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java 
b/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java
index 0f0791b..c6def06 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java
@@ -288,7 +288,7 @@ public class SslParametersTest extends 
GridCommonAbstractTest {
             cipherSuites,
             protocols,
             ClientConnectionException.class,
-            "Ignite cluster is unavailable"
+            "SSL handshake failed"
         );
     }
 
@@ -307,7 +307,7 @@ public class SslParametersTest extends 
GridCommonAbstractTest {
         this.cipherSuites = F.isEmpty(cipherSuites) ? null : cipherSuites;
         this.protocols = F.isEmpty(protocols) ? null : protocols;
 
-        GridTestUtils.assertThrows(
+        GridTestUtils.assertThrowsAnyCause(
             null,
             new Callable<Object>() {
                 @Override public Object call() {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 61adf66..686a193 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -26,6 +26,7 @@ import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -35,6 +36,7 @@ import org.apache.ignite.client.ClientAuthorizationException;
 import org.apache.ignite.client.ClientConnectionException;
 import org.apache.ignite.client.ClientException;
 import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -51,7 +53,8 @@ import static org.mockito.Mockito.mock;
  */
 public class ReliableChannelTest {
     /** Mock factory for creating new channels. */
-    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory = cfg -> new TestClientChannel();
+    private final BiFunction<ClientChannelConfiguration, 
ClientConnectionMultiplexer, ClientChannel> chFactory =
+            (cfg, hnd) -> new TestClientChannel();
 
     /** */
     private final String[] dfltAddrs = new String[]{"127.0.0.1:10800", 
"127.0.0.1:10801", "127.0.0.1:10802"};
@@ -259,7 +262,7 @@ public class ReliableChannelTest {
             .setAddresses(dfltAddrs)
             .setPartitionAwarenessEnabled(true);
 
-        ReliableChannel rc = new ReliableChannel(cfg -> new 
TestFailureClientChannel(), ccfg, null);
+        ReliableChannel rc = new ReliableChannel((cfg, hnd) -> new 
TestFailureClientChannel(), ccfg, null);
 
         rc.channelsInit();
     }
@@ -302,7 +305,7 @@ public class ReliableChannelTest {
         // Emulate cluster is down after TcpClientChannel#send operation.
         AtomicInteger step = new AtomicInteger();
 
-        ReliableChannel rc = new ReliableChannel(cfg -> {
+        ReliableChannel rc = new ReliableChannel((cfg, hnd) -> {
             if (step.getAndIncrement() == 0)
                 return new TestAsyncServiceFailureClientChannel();
             else
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index dd716d6..7eda71f 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -185,11 +186,11 @@ public abstract class 
ThinClientAbstractPartitionAwarenessTest extends GridCommo
      * @param chIdxs Channels to wait for initialization.
      */
     protected void initClient(ClientConfiguration clientCfg, int... chIdxs) 
throws IgniteInterruptedCheckedException {
-        client = new TcpIgniteClient(cfg -> {
+        client = new TcpIgniteClient((cfg, hnd) -> {
             try {
                 log.info("Establishing connection to " + cfg.getAddress());
 
-                TcpClientChannel ch = new TestTcpClientChannel(cfg);
+                TcpClientChannel ch = new TestTcpClientChannel(cfg, hnd);
 
                 log.info("Channel initialized: " + ch);
 
@@ -323,8 +324,8 @@ public abstract class 
ThinClientAbstractPartitionAwarenessTest extends GridCommo
         /**
          * @param cfg Config.
          */
-        public TestTcpClientChannel(ClientChannelConfiguration cfg) {
-            super(cfg);
+        public TestTcpClientChannel(ClientChannelConfiguration cfg, 
ClientConnectionMultiplexer hnd) {
+            super(cfg, hnd);
 
             this.cfg = cfg;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 7dc6222..2909c4e 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -23,13 +23,13 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.junit.Test;
 
-import static 
org.apache.ignite.internal.client.thin.ReliableChannel.ASYNC_RUNNER_THREAD_NAME;
-import static 
org.apache.ignite.internal.client.thin.TcpClientChannel.RECEIVER_THREAD_PREFIX;
-
 /**
  * Test resource releasing by thin client.
  */
 public class ThinClientPartitionAwarenessResourceReleaseTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
     /**
      * Test that resources are correctly released after closing client with 
partition awareness.
      */
@@ -46,15 +46,13 @@ public class 
ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
 
         assertFalse(channels[0].isClosed());
         assertFalse(channels[1].isClosed());
-        assertEquals(1, threadsCount(ASYNC_RUNNER_THREAD_NAME));
-        assertEquals(2, threadsCount(RECEIVER_THREAD_PREFIX));
+        assertEquals(1, threadsCount(THREAD_PREFIX));
 
         client.close();
 
         assertTrue(channels[0].isClosed());
         assertTrue(channels[1].isClosed());
-        assertTrue(GridTestUtils.waitForCondition(() -> 
threadsCount(ASYNC_RUNNER_THREAD_NAME) == 0, 1_000L));
-        assertTrue(GridTestUtils.waitForCondition(() -> 
threadsCount(RECEIVER_THREAD_PREFIX) == 0, 1_000L));
+        assertTrue(GridTestUtils.waitForCondition(() -> 
threadsCount(THREAD_PREFIX) == 0, 1_000L));
     }
 
     /**
@@ -68,7 +66,7 @@ public class ThinClientPartitionAwarenessResourceReleaseTest 
extends ThinClientA
         for (long id : threadIds) {
             ThreadInfo info = U.getThreadMx().getThreadInfo(id);
 
-            if (info != null && info.getThreadState() != 
Thread.State.TERMINATED && info.getThreadName().startsWith(name))
+            if (info != null && info.getThreadState() != 
Thread.State.TERMINATED && info.getThreadName().contains(name))
                 cnt++;
         }
 

Reply via email to