This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 338165a IGNITE-13496 Java thin: make async API non-blocking with GridNioServer 338165a is described below commit 338165afadd3b6979b4655ee2f03f3b9c2228236 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Wed Dec 2 19:39:25 2020 +0300 IGNITE-13496 Java thin: make async API non-blocking with GridNioServer Refactor Java Thin Client to use GridNioServer in client mode: * Client threads are never blocked * Single worker thread is shared across all connections within `IgniteClient` Benchmark results (i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275): Before Benchmark Mode Cnt Score Error Units JmhThinClientCacheBenchmark.get thrpt 10 65916.805 ± 2118.954 ops/s JmhThinClientCacheBenchmark.put thrpt 10 62304.444 ± 2521.371 ops/s After Benchmark Mode Cnt Score Error Units JmhThinClientCacheBenchmark.get thrpt 10 92501.557 ± 1380.384 ops/s JmhThinClientCacheBenchmark.put thrpt 10 82907.446 ± 7572.537 ops/s --- .../jmh/thin/JmhThinClientAbstractBenchmark.java | 135 ++++ .../jmh/thin/JmhThinClientCacheBenchmark.java | 81 +++ .../streams/BinaryByteBufferInputStream.java | 91 +-- .../internal/client/thin/ClientComputeImpl.java | 7 +- .../internal/client/thin/ClientSslUtils.java | 293 +++++++++ .../internal/client/thin/NotificationListener.java | 4 +- .../internal/client/thin/PayloadInputChannel.java | 8 +- .../internal/client/thin/ReliableChannel.java | 63 +- .../internal/client/thin/TcpClientChannel.java | 679 +++------------------ .../internal/client/thin/TcpIgniteClient.java | 27 +- .../ClientConnection.java} | 25 +- .../thin/io/ClientConnectionMultiplexer.java | 52 ++ .../ClientConnectionStateHandler.java} | 19 +- .../client/thin/io/ClientMessageDecoder.java | 92 +++ .../ClientMessageHandler.java} | 19 +- .../io/gridnioserver/GridNioClientConnection.java | 93 +++ .../GridNioClientConnectionMultiplexer.java | 147 +++++ .../io/gridnioserver/GridNioClientListener.java | 73 +++ .../thin/io/gridnioserver/GridNioClientParser.java | 59 ++ .../ignite/client/ConnectToStartingNodeTest.java | 18 +- .../apache/ignite/client/SslParametersTest.java | 4 +- .../internal/client/thin/ReliableChannelTest.java | 9 +- .../ThinClientAbstractPartitionAwarenessTest.java | 9 +- ...lientPartitionAwarenessResourceReleaseTest.java | 14 +- 24 files changed, 1228 insertions(+), 793 deletions(-) diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java new file mode 100644 index 0000000..6b6dc53 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.thin; + +import java.util.stream.IntStream; + +import org.apache.ignite.Ignite; +import org.apache.ignite.Ignition; +import org.apache.ignite.client.ClientCache; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +/** + * Base class for thin client benchmarks. + */ +@State(Scope.Benchmark) +public abstract class JmhThinClientAbstractBenchmark extends JmhAbstractBenchmark { + /** Property: nodes count. */ + protected static final String PROP_DATA_NODES = "ignite.jmh.thin.dataNodes"; + + /** Default amount of nodes. */ + protected static final int DFLT_DATA_NODES = 4; + + /** Items count. */ + protected static final int CNT = 1000; + + /** Cache value. */ + protected static final byte[] PAYLOAD = new byte[1000]; + + /** IP finder shared across nodes. */ + private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Default cache name. */ + private static final String DEFAULT_CACHE_NAME = "default"; + + /** Target node. */ + protected Ignite node; + + /** Target cache. */ + protected ClientCache<Integer, byte[]> cache; + + /** Thin client. */ + protected IgniteClient client; + + /** + * Setup routine. Child classes must invoke this method first. + * + */ + @Setup + public void setup() { + System.out.println(); + System.out.println("--------------------"); + System.out.println("IGNITE BENCHMARK INFO: "); + System.out.println("\tdata nodes: " + intProperty(PROP_DATA_NODES, DFLT_DATA_NODES)); + System.out.println("--------------------"); + System.out.println(); + + int nodesCnt = intProperty(PROP_DATA_NODES, DFLT_DATA_NODES); + + A.ensure(nodesCnt >= 1, "nodesCnt >= 1"); + + node = Ignition.start(configuration("node0")); + + for (int i = 1; i < nodesCnt; i++) + Ignition.start(configuration("node" + i)); + + String[] addrs = IntStream + .range(10800, 10800 + nodesCnt) + .mapToObj(p -> "127.0.0.1:" + p) + .toArray(String[]::new); + + ClientConfiguration cfg = new ClientConfiguration() + .setAddresses(addrs) + .setPartitionAwarenessEnabled(true); + + client = Ignition.startClient(cfg); + + cache = client.getOrCreateCache(DEFAULT_CACHE_NAME); + + System.out.println("Loading test data..."); + + for (int i = 0; i < CNT; i++) + cache.put(i, PAYLOAD); + + System.out.println("Test data loaded: " + CNT); + } + + /** + * Tear down routine. + * + */ + @TearDown + public void tearDown() throws Exception { + client.close(); + Ignition.stopAll(true); + } + + /** + * Create Ignite configuration. + * + * @param igniteInstanceName Ignite instance name. + * @return Configuration. + */ + protected IgniteConfiguration configuration(String igniteInstanceName) { + + return new IgniteConfiguration() + .setIgniteInstanceName(igniteInstanceName) + .setLocalHost("127.0.0.1") + .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + } +} diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java new file mode 100644 index 0000000..88e6a87 --- /dev/null +++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.benchmarks.jmh.thin; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Mode; + +/** + * Thin client cache benchmark. + * + * Results on i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275: + * Benchmark Mode Cnt Score Error Units + * JmhThinClientCacheBenchmark.get thrpt 10 92501.557 ± 1380.384 ops/s + * JmhThinClientCacheBenchmark.put thrpt 10 82907.446 ± 7572.537 ops/s + * + * JmhThinClientCacheBenchmark.get avgt 10 41.505 ± 1.018 us/op + * JmhThinClientCacheBenchmark.put avgt 10 44.623 ± 0.779 us/op + */ +public class JmhThinClientCacheBenchmark extends JmhThinClientAbstractBenchmark { + /** + * Cache put benchmark. + */ + @Benchmark + public void put() { + int key = ThreadLocalRandom.current().nextInt(CNT); + + cache.put(key, PAYLOAD); + } + + /** + * Cache get benchmark. + */ + @Benchmark + public Object get() { + int key = ThreadLocalRandom.current().nextInt(CNT); + + return cache.get(key); + } + + /** + * Run benchmarks. + * + * @param args Arguments. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + JmhIdeBenchmarkRunner runner = JmhIdeBenchmarkRunner.create() + .forks(1) + .threads(4) + .benchmarks(JmhThinClientCacheBenchmark.class.getSimpleName()) + .jvmArguments("-Xms4g", "-Xmx4g"); + + runner + .benchmarkModes(Mode.Throughput) + .run(); + + runner + .benchmarkModes(Mode.AverageTime) + .outputTimeUnit(TimeUnit.MICROSECONDS) + .run(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java index d277948..fe138e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java @@ -18,14 +18,14 @@ package org.apache.ignite.internal.binary.streams; import java.nio.ByteBuffer; -import org.apache.ignite.binary.BinaryObjectException; +import java.util.Arrays; /** - * + * Input stream over {@link ByteBuffer}. */ public class BinaryByteBufferInputStream implements BinaryInputStream { /** */ - private ByteBuffer buf; + private final ByteBuffer buf; /** * @param buf Buffer to wrap. @@ -44,15 +44,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public byte readByte() { - ensureHasData(1); - return buf.get(); } /** {@inheritDoc} */ @Override public byte[] readByteArray(int cnt) { - ensureHasData(cnt); - byte[] data = new byte[cnt]; buf.get(data); @@ -62,22 +58,16 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public int read(byte[] arr, int off, int cnt) { - ensureHasData(cnt); - return 0; } /** {@inheritDoc} */ @Override public boolean readBoolean() { - ensureHasData(1); - - return false; + return readByte() == 1; } /** {@inheritDoc} */ @Override public boolean[] readBooleanArray(int cnt) { - ensureHasData(cnt); - boolean[] res = new boolean[cnt]; for (int i = 0; i < cnt; i++) @@ -88,15 +78,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public short readShort() { - ensureHasData(2); - return buf.getShort(); } /** {@inheritDoc} */ @Override public short[] readShortArray(int cnt) { - ensureHasData(2 * cnt); - short[] res = new short[cnt]; for (int i = 0; i < cnt; i++) @@ -107,15 +93,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public char readChar() { - ensureHasData(2); - return buf.getChar(); } /** {@inheritDoc} */ @Override public char[] readCharArray(int cnt) { - ensureHasData(2 * cnt); - char[] res = new char[cnt]; for (int i = 0; i < cnt; i++) @@ -126,15 +108,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public int readInt() { - ensureHasData(4); - return buf.getInt(); } /** {@inheritDoc} */ @Override public int[] readIntArray(int cnt) { - ensureHasData(4 * cnt); - int[] res = new int[cnt]; for (int i = 0; i < cnt; i++) @@ -145,15 +123,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public float readFloat() { - ensureHasData(4); - return buf.getFloat(); } /** {@inheritDoc} */ @Override public float[] readFloatArray(int cnt) { - ensureHasData(4 * cnt); - float[] res = new float[cnt]; for (int i = 0; i < cnt; i++) @@ -164,15 +138,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public long readLong() { - ensureHasData(8); - return buf.getLong(); } /** {@inheritDoc} */ @Override public long[] readLongArray(int cnt) { - ensureHasData(8 * cnt); - long[] res = new long[cnt]; for (int i = 0; i < cnt; i++) @@ -183,15 +153,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public double readDouble() { - ensureHasData(8); - return buf.getDouble(); } /** {@inheritDoc} */ @Override public double[] readDoubleArray(int cnt) { - ensureHasData(8 * cnt); - double[] res = new double[cnt]; for (int i = 0; i < cnt; i++) @@ -207,47 +173,17 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public byte readBytePositioned(int pos) { - int oldPos = buf.position(); - - buf.position(pos); - - ensureHasData(1); - - byte res = buf.get(); - - buf.position(oldPos); - - return res; + return buf.get(pos); } /** {@inheritDoc} */ @Override public short readShortPositioned(int pos) { - int oldPos = buf.position(); - - buf.position(pos); - - ensureHasData(2); - - short res = buf.getShort(); - - buf.position(oldPos); - - return res; + return buf.getShort(pos); } /** {@inheritDoc} */ @Override public int readIntPositioned(int pos) { - int oldPos = buf.position(); - - buf.position(pos); - - ensureHasData(4); - - byte res = buf.get(); - - buf.position(oldPos); - - return res; + return buf.getInt(pos); } /** {@inheritDoc} */ @@ -277,7 +213,9 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { /** {@inheritDoc} */ @Override public byte[] arrayCopy() { - return buf.array(); + byte[] arr = buf.array(); + + return Arrays.copyOf(arr, arr.length); } /** {@inheritDoc} */ @@ -289,13 +227,4 @@ public class BinaryByteBufferInputStream implements BinaryInputStream { @Override public boolean hasArray() { return false; } - - /** - * @param cnt Remaining bytes. - */ - private void ensureHasData(int cnt) { - if (buf.remaining() < cnt) - throw new BinaryObjectException("Not enough data to read the value " + - "[requiredBytes=" + cnt + ", remainingBytes=" + buf.remaining() + ']'); - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java index d4cb415..65d1c2d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.client.thin; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -40,7 +41,7 @@ import org.apache.ignite.client.ClientFeatureNotSupportedByServerException; import org.apache.ignite.client.IgniteClientFuture; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawWriterEx; -import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream; import org.apache.ignite.internal.processors.platform.client.ClientStatus; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -353,11 +354,11 @@ class ClientComputeImpl implements ClientCompute, NotificationListener { ClientChannel ch, ClientOperation op, long rsrcId, - byte[] payload, + ByteBuffer payload, Exception err ) { if (op == ClientOperation.COMPUTE_TASK_FINISHED) { - Object res = payload == null ? null : utils.readObject(new BinaryHeapInputStream(payload), false); + Object res = payload == null ? null : utils.readObject(BinaryByteBufferInputStream.create(payload), false); ClientComputeTask<Object> task = addTask(ch, rsrcId); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java new file mode 100644 index 0000000..4f964d8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.function.BiFunction; +import java.util.function.Predicate; +import java.util.stream.Stream; +import javax.cache.configuration.Factory; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.TrustManagerFactory; +import javax.net.ssl.X509TrustManager; + +import org.apache.ignite.client.SslMode; +import org.apache.ignite.client.SslProtocol; +import org.apache.ignite.configuration.ClientConfiguration; + +import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM; +import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE; + +public class ClientSslUtils { + /** */ + public static final char[] EMPTY_CHARS = new char[0]; + + /** Trust manager ignoring all certificate checks. */ + private static final TrustManager ignoreErrorsTrustMgr = new X509TrustManager() { + /** */ + @Override public X509Certificate[] getAcceptedIssuers() { + return null; + } + + /** */ + @Override public void checkServerTrusted(X509Certificate[] arg0, String arg1) { + // No-op. + } + + /** */ + @Override public void checkClientTrusted(X509Certificate[] arg0, String arg1) { + // No-op. + } + }; + + /** + * Gets SSL context for the given client configuration. + * + * @param cfg Configuration. + * @return {@link SSLContext} when SSL is enabled in the configuration; null otherwise. + */ + public static SSLContext getSslContext(ClientConfiguration cfg) { + if (cfg.getSslMode() == SslMode.DISABLED) + return null; + + Factory<SSLContext> sslCtxFactory = cfg.getSslContextFactory(); + + if (sslCtxFactory != null) { + try { + return sslCtxFactory.create(); + } + catch (Exception e) { + throw new ClientError("SSL Context Factory failed", e); + } + } + + BiFunction<String, String, String> or = (val, dflt) -> val == null || val.isEmpty() ? dflt : val; + + String keyStore = or.apply( + cfg.getSslClientCertificateKeyStorePath(), + System.getProperty("javax.net.ssl.keyStore") + ); + + String keyStoreType = or.apply( + cfg.getSslClientCertificateKeyStoreType(), + or.apply(System.getProperty("javax.net.ssl.keyStoreType"), DFLT_STORE_TYPE) + ); + + String keyStorePwd = or.apply( + cfg.getSslClientCertificateKeyStorePassword(), + System.getProperty("javax.net.ssl.keyStorePassword") + ); + + String trustStore = or.apply( + cfg.getSslTrustCertificateKeyStorePath(), + System.getProperty("javax.net.ssl.trustStore") + ); + + String trustStoreType = or.apply( + cfg.getSslTrustCertificateKeyStoreType(), + or.apply(System.getProperty("javax.net.ssl.trustStoreType"), DFLT_STORE_TYPE) + ); + + String trustStorePwd = or.apply( + cfg.getSslTrustCertificateKeyStorePassword(), + System.getProperty("javax.net.ssl.trustStorePassword") + ); + + String algorithm = or.apply(cfg.getSslKeyAlgorithm(), DFLT_KEY_ALGORITHM); + + String proto = toString(cfg.getSslProtocol()); + + if (Stream.of(keyStore, keyStorePwd, keyStoreType, trustStore, trustStorePwd, trustStoreType) + .allMatch(s -> s == null || s.isEmpty()) + ) { + try { + return SSLContext.getDefault(); + } + catch (NoSuchAlgorithmException e) { + throw new ClientError("Default SSL context cryptographic algorithm is not available", e); + } + } + + KeyManager[] keyManagers = getKeyManagers(algorithm, keyStore, keyStoreType, keyStorePwd); + + TrustManager[] trustManagers = cfg.isSslTrustAll() ? + new TrustManager[] {ignoreErrorsTrustMgr} : + getTrustManagers(algorithm, trustStore, trustStoreType, trustStorePwd); + + try { + SSLContext sslCtx = SSLContext.getInstance(proto); + + sslCtx.init(keyManagers, trustManagers, null); + + return sslCtx; + } + catch (NoSuchAlgorithmException e) { + throw new ClientError("SSL context cryptographic algorithm is not available", e); + } + catch (KeyManagementException e) { + throw new ClientError("Failed to create SSL Context", e); + } + } + + /** + * @return String representation of {@link SslProtocol} as required by {@link SSLContext}. + */ + private static String toString(SslProtocol proto) { + switch (proto) { + case TLSv1_1: + return "TLSv1.1"; + + case TLSv1_2: + return "TLSv1.2"; + + default: + return proto.toString(); + } + } + + /** */ + private static KeyManager[] getKeyManagers( + String algorithm, + String keyStore, + String keyStoreType, + String keyStorePwd + ) { + KeyManagerFactory keyMgrFactory; + + try { + keyMgrFactory = KeyManagerFactory.getInstance(algorithm); + } + catch (NoSuchAlgorithmException e) { + throw new ClientError("Key manager cryptographic algorithm is not available", e); + } + + Predicate<String> empty = s -> s == null || s.isEmpty(); + + if (!empty.test(keyStore) && !empty.test(keyStoreType)) { + char[] pwd = (keyStorePwd == null) ? EMPTY_CHARS : keyStorePwd.toCharArray(); + + KeyStore store = loadKeyStore("Client", keyStore, keyStoreType, pwd); + + try { + keyMgrFactory.init(store, pwd); + } + catch (UnrecoverableKeyException e) { + throw new ClientError("Could not recover key store key", e); + } + catch (KeyStoreException e) { + throw new ClientError( + String.format("Client key store provider of type [%s] is not available", keyStoreType), + e + ); + } + catch (NoSuchAlgorithmException e) { + throw new ClientError("Client key store integrity check algorithm is not available", e); + } + } + + return keyMgrFactory.getKeyManagers(); + } + + /** */ + private static TrustManager[] getTrustManagers( + String algorithm, + String trustStore, + String trustStoreType, + String trustStorePwd + ) { + TrustManagerFactory trustMgrFactory; + + try { + trustMgrFactory = TrustManagerFactory.getInstance(algorithm); + } + catch (NoSuchAlgorithmException e) { + throw new ClientError("Trust manager cryptographic algorithm is not available", e); + } + + Predicate<String> empty = s -> s == null || s.isEmpty(); + + if (!empty.test(trustStore) && !empty.test(trustStoreType)) { + char[] pwd = (trustStorePwd == null) ? EMPTY_CHARS : trustStorePwd.toCharArray(); + + KeyStore store = loadKeyStore("Trust", trustStore, trustStoreType, pwd); + + try { + trustMgrFactory.init(store); + } + catch (KeyStoreException e) { + throw new ClientError( + String.format("Trust key store provider of type [%s] is not available", trustStoreType), + e + ); + } + } + + return trustMgrFactory.getTrustManagers(); + } + + /** */ + private static KeyStore loadKeyStore(String lb, String path, String type, char[] pwd) { + KeyStore store; + + try { + store = KeyStore.getInstance(type); + } + catch (KeyStoreException e) { + throw new ClientError( + String.format("%s key store provider of type [%s] is not available", lb, type), + e + ); + } + + try (InputStream in = new FileInputStream(new File(path))) { + + store.load(in, pwd); + + return store; + } + catch (FileNotFoundException e) { + throw new ClientError(String.format("%s key store file [%s] does not exist", lb, path), e); + } + catch (NoSuchAlgorithmException e) { + throw new ClientError( + String.format("%s key store integrity check algorithm is not available", lb), + e + ); + } + catch (CertificateException e) { + throw new ClientError(String.format("Could not load certificate from %s key store", lb), e); + } + catch (IOException e) { + throw new ClientError(String.format("Could not read %s key store", lb), e); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java index ae1b7fa..3aee483 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.client.thin; +import java.nio.ByteBuffer; + /** * Server to client notification listener. */ @@ -30,5 +32,5 @@ interface NotificationListener { * @param payload Notification payload or {@code null} if there is no payload. * @param err Error. */ - public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err); + public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, ByteBuffer payload, Exception err); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java index 76af7f2..f9d5978 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.client.thin; -import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import java.nio.ByteBuffer; + +import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream; import org.apache.ignite.internal.binary.streams.BinaryInputStream; /** @@ -33,8 +35,8 @@ class PayloadInputChannel { /** * Constructor. */ - PayloadInputChannel(ClientChannel ch, byte[] payload) { - in = new BinaryHeapInputStream(payload); + PayloadInputChannel(ClientChannel ch, ByteBuffer payload) { + in = BinaryByteBufferInputStream.create(payload); this.ch = ch; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java index e7005be..195088d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.client.thin; import java.net.InetSocketAddress; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -31,13 +32,11 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -51,26 +50,21 @@ import org.apache.ignite.client.ClientException; import org.apache.ignite.client.IgniteClientFuture; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; +import org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientConnectionMultiplexer; import org.apache.ignite.internal.util.HostAndPortRange; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.NotNull; /** * Communication channel with failover and partition awareness. */ final class ReliableChannel implements AutoCloseable, NotificationListener { - /** Timeout to wait for executor service to shutdown (in milliseconds). */ - private static final long EXECUTOR_SHUTDOWN_TIMEOUT = 10_000L; - /** Do nothing helper function. */ private static final Consumer<Integer> DO_NOTHING = (v) -> {}; - /** Async runner thread name. */ - static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-init"; - /** Channel factory. */ - private final Function<ClientChannelConfiguration, ClientChannel> chFactory; + private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory; /** Client channel holders for each configured address. */ private volatile List<ClientChannelHolder> channels; @@ -96,19 +90,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { /** Listeners of channel close events. */ private final Collection<Consumer<ClientChannel>> channelCloseLsnrs = new CopyOnWriteArrayList<>(); - /** Async tasks thread pool. */ - private final ExecutorService asyncRunner = Executors.newSingleThreadExecutor( - new ThreadFactory() { - @Override public Thread newThread(@NotNull Runnable r) { - Thread thread = new Thread(r, ASYNC_RUNNER_THREAD_NAME); - - thread.setDaemon(true); - - return thread; - } - } - ); - /** Channels reinit was scheduled. */ private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean(); @@ -130,6 +111,9 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { /** Guard channels and curChIdx together. */ private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock(); + /** Connection manager. */ + private final ClientConnectionMultiplexer connMgr; + /** Cache addresses returned by {@code ThinClientAddressFinder}. */ private volatile String[] prevHostAddrs; @@ -137,9 +121,9 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { * Constructor. */ ReliableChannel( - Function<ClientChannelConfiguration, ClientChannel> chFactory, - ClientConfiguration clientCfg, - IgniteBinary binary + BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory, + ClientConfiguration clientCfg, + IgniteBinary binary ) { if (chFactory == null) throw new NullPointerException("chFactory"); @@ -153,20 +137,16 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled(); affinityCtx = new ClientCacheAffinityContext(binary); + + connMgr = new GridNioClientConnectionMultiplexer(clientCfg); + connMgr.start(); } /** {@inheritDoc} */ @Override public synchronized void close() { closed = true; - asyncRunner.shutdown(); - - try { - asyncRunner.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - } - catch (InterruptedException ignore) { - // No-op. - } + connMgr.stop(); List<ClientChannelHolder> holders = channels; @@ -430,7 +410,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { ClientChannel ch, ClientOperation op, long rsrcId, - byte[] payload, + ByteBuffer payload, Exception err ) { for (NotificationListener lsnr : notificationLsnrs) { @@ -579,7 +559,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { * Asynchronously try to establish a connection to all configured servers. */ private void initAllChannelsAsync() { - asyncRunner.submit( + ForkJoinPool.commonPool().submit( () -> { List<ClientChannelHolder> holders = channels; @@ -608,7 +588,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { if (scheduledChannelsReinit.compareAndSet(false, true)) { // If partition awareness is disabled then only schedule and wait for the default channel to fail. if (partitionAwarenessEnabled) - asyncRunner.submit(this::channelsInit); + ForkJoinPool.commonPool().submit(this::channelsInit); } } } @@ -867,6 +847,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { /** * Channels holder. */ + @SuppressWarnings("PackageVisibleInnerClass") // Visible for tests. class ClientChannelHolder { /** Channel configuration. */ private final ClientChannelConfiguration chCfg; @@ -937,7 +918,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { if (!ignoreThrottling && applyReconnectionThrottling()) throw new ClientConnectionException("Reconnect is not allowed due to applied throttling"); - ClientChannel channel = chFactory.apply(chCfg); + ClientChannel channel = chFactory.apply(chCfg, connMgr); if (channel.serverNodeId() != null) { channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged); @@ -1008,6 +989,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { /** * Get holders reference. For test purposes. */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests. List<ClientChannelHolder> getChannelHolders() { return channels; } @@ -1015,6 +997,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener { /** * Get node channels reference. For test purposes. */ + @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests. Map<UUID, ClientChannelHolder> getNodeChannels() { return nodeChannels; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java index 25df909..109c2a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java @@ -17,21 +17,9 @@ package org.apache.ignite.internal.client.thin; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; -import java.net.Socket; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; @@ -44,22 +32,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Predicate; -import java.util.stream.Stream; -import javax.cache.configuration.Factory; -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSocket; -import javax.net.ssl.SSLSocketFactory; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509TrustManager; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.client.ClientAuthenticationException; import org.apache.ignite.client.ClientAuthorizationException; @@ -67,19 +41,20 @@ import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientException; import org.apache.ignite.client.ClientFeatureNotSupportedByServerException; import org.apache.ignite.client.ClientReconnectedException; -import org.apache.ignite.client.SslMode; -import org.apache.ignite.client.SslProtocol; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler; import org.apache.ignite.internal.binary.BinaryContext; -import org.apache.ignite.internal.binary.BinaryPrimitives; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryWriterExImpl; -import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream; import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; +import org.apache.ignite.internal.client.thin.io.ClientConnection; +import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; +import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler; +import org.apache.ignite.internal.client.thin.io.ClientMessageHandler; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener; import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; @@ -103,19 +78,14 @@ import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_7_0; import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.AUTHORIZATION; import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.BITMAP_FEATURES; import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.PARTITION_AWARENESS; -import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM; -import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE; /** * Implements {@link ClientChannel} over TCP. */ -class TcpClientChannel implements ClientChannel { +class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientConnectionStateHandler { /** Protocol version used by default on first connection attempt. */ private static final ProtocolVersion DEFAULT_VERSION = LATEST_VER; - /** Receiver thread prefix. */ - static final String RECEIVER_THREAD_PREFIX = "thin-client-channel#"; - /** Supported protocol versions. */ private static final Collection<ProtocolVersion> supportedVers = Arrays.asList( V1_7_0, @@ -128,30 +98,24 @@ class TcpClientChannel implements ClientChannel { V1_0_0 ); + /** Preallocated empty bytes. */ + public static final byte[] EMPTY_BYTES = new byte[0]; + /** Protocol context. */ - private ProtocolContext protocolCtx; + private volatile ProtocolContext protocolCtx; /** Server node ID. */ - private UUID srvNodeId; + private volatile UUID srvNodeId; /** Server topology version. */ - private AffinityTopologyVersion srvTopVer; + private volatile AffinityTopologyVersion srvTopVer; /** Channel. */ - private final Socket sock; - - /** Output stream. */ - private final OutputStream out; - - /** Data input. */ - private final ByteCountingDataInput dataInput; + private final ClientConnection sock; /** Request id. */ private final AtomicLong reqId = new AtomicLong(1); - /** Send lock. */ - private final Lock sndLock = new ReentrantLock(); - /** Pending requests. */ private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>(); @@ -167,14 +131,11 @@ class TcpClientChannel implements ClientChannel { /** Executor for async operation listeners. */ private final Executor asyncContinuationExecutor; - /** Receiver thread (processes incoming messages). */ - private Thread receiverThread; - /** Send/receive timeout in milliseconds. */ private final int timeout; /** Constructor. */ - TcpClientChannel(ClientChannelConfiguration cfg) + TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr) throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { validateConfiguration(cfg); @@ -183,21 +144,9 @@ class TcpClientChannel implements ClientChannel { timeout = cfg.getTimeout(); - try { - sock = createSocket(cfg); - - out = sock.getOutputStream(); - dataInput = new ByteCountingDataInput(sock.getInputStream()); - - handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes()); + sock = connMgr.open(cfg.getAddress(), this, this); - // Disable timeout on socket after handshake, instead, get future result with timeout in "receive" method. - if (timeout > 0) - sock.setSoTimeout(0); - } - catch (IOException e) { - throw handleIOError("addr=" + cfg.getAddress(), e); - } + handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes()); } /** {@inheritDoc} */ @@ -205,28 +154,25 @@ class TcpClientChannel implements ClientChannel { close(null); } + /** {@inheritDoc} */ + @Override public void onMessage(ByteBuffer buf) { + processNextMessage(buf); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(@Nullable Exception e) { + close(e); + } + /** * Close the channel with cause. */ private void close(Throwable cause) { if (closed.compareAndSet(false, true)) { - U.closeQuiet(dataInput); - U.closeQuiet(out); U.closeQuiet(sock); - sndLock.lock(); // Lock here to prevent creation of new pending requests. - - try { - for (ClientRequestFuture pendingReq : pendingReqs.values()) - pendingReq.onDone(new ClientConnectionException("Channel is closed", cause)); - - if (receiverThread != null) - receiverThread.interrupt(); - } - finally { - sndLock.unlock(); - } - + for (ClientRequestFuture pendingReq : pendingReqs.values()) + pendingReq.onDone(new ClientConnectionException("Channel is closed", cause)); } } @@ -251,7 +197,8 @@ class TcpClientChannel implements ClientChannel { ClientRequestFuture fut = send(op, payloadWriter); return receiveAsync(fut, payloadReader); - } catch (Throwable t) { + } + catch (Throwable t) { CompletableFuture<T> fut = new CompletableFuture<>(); fut.completeExceptionally(t); @@ -268,15 +215,10 @@ class TcpClientChannel implements ClientChannel { throws ClientException { long id = reqId.getAndIncrement(); - // Only one thread at a time can have access to write to the channel. - sndLock.lock(); - try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) { if (closed()) throw new ClientConnectionException("Channel is closed"); - initReceiverThread(); // Start the receiver thread with the first request. - ClientRequestFuture fut = new ClientRequestFuture(); pendingReqs.put(id, fut); @@ -292,7 +234,8 @@ class TcpClientChannel implements ClientChannel { req.writeInt(0, req.position() - 4); // Actual size. - write(req.array(), req.position()); + // arrayCopy is required, because buffer is pooled, and write is async. + write(req.arrayCopy(), req.position()); return fut; } @@ -301,9 +244,6 @@ class TcpClientChannel implements ClientChannel { throw t; } - finally { - sndLock.unlock(); - } } /** @@ -314,7 +254,7 @@ class TcpClientChannel implements ClientChannel { private <T> T receive(ClientRequestFuture pendingReq, Function<PayloadInputChannel, T> payloadReader) throws ClientException { try { - byte[] payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get(); + ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get(); if (payload == null || payloadReader == null) return null; @@ -338,7 +278,7 @@ class TcpClientChannel implements ClientChannel { pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(() -> { try { - byte[] payload = payloadFut.get(); + ByteBuffer payload = payloadFut.get(); if (payload == null || payloadReader == null) fut.complete(null); @@ -346,7 +286,8 @@ class TcpClientChannel implements ClientChannel { T res = payloadReader.apply(new PayloadInputChannel(this, payload)); fut.complete(res); } - } catch (Throwable t) { + } + catch (Throwable t) { fut.completeExceptionally(convertException(t)); } })); @@ -389,58 +330,29 @@ class TcpClientChannel implements ClientChannel { } /** - * Init and start receiver thread if it wasn't started before. - * - * Note: Method should be called only under external synchronization. - */ - private void initReceiverThread() { - if (receiverThread == null) { - Socket sock = this.sock; - - String sockInfo = sock == null ? null : sock.getInetAddress().getHostName() + ":" + sock.getPort(); - - receiverThread = new Thread(() -> { - try { - while (!closed()) - processNextMessage(); - } - catch (Throwable e) { - close(e); - } - }, RECEIVER_THREAD_PREFIX + sockInfo); - - receiverThread.setDaemon(true); - - receiverThread.start(); - } - } - - /** * Process next message from the input stream and complete corresponding future. */ - private void processNextMessage() throws ClientProtocolError, ClientConnectionException { - // blocking read a message header not to fall into a busy loop - int msgSize = dataInput.readInt(2048); - - if (msgSize <= 0) - throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize)); + private void processNextMessage(ByteBuffer buf) throws ClientProtocolError, ClientConnectionException { + BinaryInputStream dataInput = BinaryByteBufferInputStream.create(buf); - long bytesReadOnStartMsg = dataInput.totalBytesRead(); + if (protocolCtx == null) { + // Process handshake. + pendingReqs.remove(-1L).onDone(buf); + return; + } - long resId = dataInput.spinReadLong(); + long resId = dataInput.readLong(); int status = 0; ClientOperation notificationOp = null; - BinaryInputStream resIn; - if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) { - short flags = dataInput.spinReadShort(); + short flags = dataInput.readShort(); if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) { - long topVer = dataInput.spinReadLong(); - int minorTopVer = dataInput.spinReadInt(); + long topVer = dataInput.readLong(); + int minorTopVer = dataInput.readInt(); srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer); @@ -449,7 +361,7 @@ class TcpClientChannel implements ClientChannel { } if ((flags & ClientFlag.NOTIFICATION) != 0) { - short notificationCode = dataInput.spinReadShort(); + short notificationCode = dataInput.readShort(); notificationOp = ClientOperation.fromCode(notificationCode); @@ -458,28 +370,25 @@ class TcpClientChannel implements ClientChannel { } if ((flags & ClientFlag.ERROR) != 0) - status = dataInput.spinReadInt(); + status = dataInput.readInt(); } else - status = dataInput.spinReadInt(); + status = dataInput.readInt(); - int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg); + int hdrSize = dataInput.position(); + int msgSize = buf.limit(); - byte[] res = null; + ByteBuffer res = null; Exception err = null; if (status == 0) { if (msgSize > hdrSize) - res = dataInput.spinRead(msgSize - hdrSize); + res = buf; } - else if (status == ClientStatus.SECURITY_VIOLATION) { - dataInput.spinRead(msgSize - hdrSize); // Read message to the end. - + else if (status == ClientStatus.SECURITY_VIOLATION) err = new ClientAuthorizationException(); - } else { - resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - hdrSize)); - - String errMsg = ClientUtils.createBinaryReader(null, resIn).readString(); + else { + String errMsg = ClientUtils.createBinaryReader(null, dataInput).readString(); err = new ClientServerError(errMsg, status, resId); } @@ -543,31 +452,21 @@ class TcpClientChannel implements ClientChannel { throw new IllegalArgumentException(error); } - /** Create socket. */ - private static Socket createSocket(ClientChannelConfiguration cfg) throws IOException { - Socket sock = cfg.getSslMode() == SslMode.REQUIRED ? - new ClientSslSocketFactory(cfg).create() : - new Socket(cfg.getAddress().getHostName(), cfg.getAddress().getPort()); - - sock.setTcpNoDelay(cfg.isTcpNoDelay()); - - if (cfg.getTimeout() > 0) - sock.setSoTimeout(cfg.getTimeout()); - - if (cfg.getSendBufferSize() > 0) - sock.setSendBufferSize(cfg.getSendBufferSize()); - - if (cfg.getReceiveBufferSize() > 0) - sock.setReceiveBufferSize(cfg.getReceiveBufferSize()); - - return sock; - } - /** Client handshake. */ private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs) throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { + ClientRequestFuture fut = new ClientRequestFuture(); + pendingReqs.put(-1L, fut); + handshakeReq(ver, user, pwd, userAttrs); - handshakeRes(ver, user, pwd, userAttrs); + + try { + ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get(); + handshakeRes(res, ver, user, pwd, userAttrs); + } + catch (IgniteCheckedException e) { + throw new ClientConnectionException(e.getMessage(), e); + } } /** Send handshake request. */ @@ -604,7 +503,7 @@ class TcpClientChannel implements ClientChannel { writer.out().writeInt(0, writer.out().position() - 4);// actual size - write(writer.array(), writer.out().position()); + write(writer.out().arrayCopy(), writer.out().position()); } } @@ -621,20 +520,15 @@ class TcpClientChannel implements ClientChannel { } /** Receive and handle handshake response. */ - private void handshakeRes(ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs) + private void handshakeRes(ByteBuffer buf, ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs) throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { - int resSize = dataInput.readInt(); - - if (resSize <= 0) - throw new ClientProtocolError(String.format("Invalid handshake response size: %s", resSize)); - - BinaryInputStream res = new BinaryHeapInputStream(dataInput.read(resSize)); + BinaryInputStream res = BinaryByteBufferInputStream.create(buf); try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) { boolean success = res.readBoolean(); if (success) { - byte[] features = new byte[0]; + byte[] features = EMPTY_BYTES; if (ProtocolContext.isFeatureSupported(proposedVer, BITMAP_FEATURES)) features = reader.readByteArray(); @@ -680,12 +574,13 @@ class TcpClientChannel implements ClientChannel { /** Write bytes to the output stream. */ private void write(byte[] bytes, int len) throws ClientConnectionException { + ByteBuffer buf = ByteBuffer.wrap(bytes, 0, len); + try { - out.write(bytes, 0, len); - out.flush(); + sock.send(buf); } - catch (IOException e) { - throw handleIOError(e); + catch (IgniteCheckedException e) { + throw new ClientConnectionException(e.getMessage(), e); } } @@ -705,424 +600,8 @@ class TcpClientChannel implements ClientChannel { } /** - * Auxiliary class to read byte buffers and numeric values, counting total bytes read. - * Numeric values are read in the little-endian byte order. - */ - private class ByteCountingDataInput implements AutoCloseable { - /** Input stream. */ - private final InputStream in; - - /** Total bytes read from the input stream. */ - private long totalBytesRead; - - /** Temporary buffer to read long, int and short values. */ - private final byte[] tmpBuf = new byte[Long.BYTES]; - - /** - * @param in Input stream. - */ - public ByteCountingDataInput(InputStream in) { - this.in = in; - } - - /** Read bytes from the input stream. */ - public byte[] read(int len) throws ClientConnectionException { - byte[] bytes = new byte[len]; - - read(bytes, len, 0); - - return bytes; - } - - /** Read bytes from the input stream. */ - public byte[] spinRead(int len) { - byte[] bytes = new byte[len]; - - read(bytes, len, Integer.MAX_VALUE); - - return bytes; - } - - /** - * Read bytes from the input stream to the buffer. - * - * @param bytes Bytes buffer. - * @param len Length. - * @param tryReadCnt Number of reads before falling into blocking read. - */ - public void read(byte[] bytes, int len, int tryReadCnt) throws ClientConnectionException { - int offset = 0; - - try { - while (offset < len) { - int toRead; - - if (tryReadCnt == 0) - toRead = len - offset; - else if ((toRead = Math.min(in.available(), len - offset)) == 0) { - tryReadCnt--; - - continue; - } - - int read = in.read(bytes, offset, toRead); - - if (read < 0) - throw handleIOError(null); - - offset += read; - totalBytesRead += read; - } - } - catch (IOException e) { - throw handleIOError(e); - } - } - - /** - * Read long value from the input stream. - */ - public long readLong() throws ClientConnectionException { - return readLong(0); - } - - /** - * Read long value from the input stream. - */ - public long spinReadLong() throws ClientConnectionException { - return readLong(Integer.MAX_VALUE); - } - - /** - * Read long value from the input stream. - * - * @param tryReadCnt Number of reads before falling into blocking read. - */ - private long readLong(int tryReadCnt) throws ClientConnectionException { - read(tmpBuf, Long.BYTES, tryReadCnt); - - return BinaryPrimitives.readLong(tmpBuf, 0); - } - - /** - * Read int value from the input stream. - */ - public int readInt() throws ClientConnectionException { - return readInt(0); - } - - /** - * Read int value from the input stream. - */ - public int spinReadInt() throws ClientConnectionException { - return readInt(Integer.MAX_VALUE); - } - - /** - * Read int value from the input stream. - * - * @param tryReadCnt Number of reads before falling into blocking read. - */ - private int readInt(int tryReadCnt) throws ClientConnectionException { - read(tmpBuf, Integer.BYTES, tryReadCnt); - - return BinaryPrimitives.readInt(tmpBuf, 0); - } - - /** - * Read short value from the input stream. - */ - public short readShort() throws ClientConnectionException { - return readShort(0); - } - - /** - * Read short value from the input stream. - */ - public short spinReadShort() throws ClientConnectionException { - return readShort(Integer.MAX_VALUE); - } - - /** - * Read short value from the input stream. - * - * @param tryReadCnt Number of reads before falling into blocking read. - */ - public short readShort(int tryReadCnt) throws ClientConnectionException { - read(tmpBuf, Short.BYTES, tryReadCnt); - - return BinaryPrimitives.readShort(tmpBuf, 0); - } - - /** - * Gets total bytes read from the input stream. - */ - public long totalBytesRead() { - return totalBytesRead; - } - - /** - * Close input stream. - */ - @Override public void close() throws IOException { - in.close(); - } - } - - /** * */ - private static class ClientRequestFuture extends GridFutureAdapter<byte[]> { - } - - /** SSL Socket Factory. */ - private static class ClientSslSocketFactory { - /** Trust manager ignoring all certificate checks. */ - private static final TrustManager ignoreErrorsTrustMgr = new X509TrustManager() { - @Override public X509Certificate[] getAcceptedIssuers() { - return null; - } - - @Override public void checkServerTrusted(X509Certificate[] arg0, String arg1) { - } - - @Override public void checkClientTrusted(X509Certificate[] arg0, String arg1) { - } - }; - - /** Config. */ - private final ClientChannelConfiguration cfg; - - /** Constructor. */ - ClientSslSocketFactory(ClientChannelConfiguration cfg) { - this.cfg = cfg; - } - - /** Create SSL socket. */ - SSLSocket create() throws IOException { - InetSocketAddress addr = cfg.getAddress(); - - SSLSocket sock = (SSLSocket)getSslSocketFactory(cfg).createSocket(addr.getHostName(), addr.getPort()); - - sock.setUseClientMode(true); - - sock.startHandshake(); - - return sock; - } - - /** Create SSL socket factory. */ - private static SSLSocketFactory getSslSocketFactory(ClientChannelConfiguration cfg) { - Factory<SSLContext> sslCtxFactory = cfg.getSslContextFactory(); - - if (sslCtxFactory != null) { - try { - return sslCtxFactory.create().getSocketFactory(); - } - catch (Exception e) { - throw new ClientError("SSL Context Factory failed", e); - } - } - - BiFunction<String, String, String> or = (val, dflt) -> val == null || val.isEmpty() ? dflt : val; - - String keyStore = or.apply( - cfg.getSslClientCertificateKeyStorePath(), - System.getProperty("javax.net.ssl.keyStore") - ); - - String keyStoreType = or.apply( - cfg.getSslClientCertificateKeyStoreType(), - or.apply(System.getProperty("javax.net.ssl.keyStoreType"), DFLT_STORE_TYPE) - ); - - String keyStorePwd = or.apply( - cfg.getSslClientCertificateKeyStorePassword(), - System.getProperty("javax.net.ssl.keyStorePassword") - ); - - String trustStore = or.apply( - cfg.getSslTrustCertificateKeyStorePath(), - System.getProperty("javax.net.ssl.trustStore") - ); - - String trustStoreType = or.apply( - cfg.getSslTrustCertificateKeyStoreType(), - or.apply(System.getProperty("javax.net.ssl.trustStoreType"), DFLT_STORE_TYPE) - ); - - String trustStorePwd = or.apply( - cfg.getSslTrustCertificateKeyStorePassword(), - System.getProperty("javax.net.ssl.trustStorePassword") - ); - - String algorithm = or.apply(cfg.getSslKeyAlgorithm(), DFLT_KEY_ALGORITHM); - - String proto = toString(cfg.getSslProtocol()); - - if (Stream.of(keyStore, keyStorePwd, keyStoreType, trustStore, trustStorePwd, trustStoreType) - .allMatch(s -> s == null || s.isEmpty()) - ) { - try { - return SSLContext.getDefault().getSocketFactory(); - } - catch (NoSuchAlgorithmException e) { - throw new ClientError("Default SSL context cryptographic algorithm is not available", e); - } - } - - KeyManager[] keyManagers = getKeyManagers(algorithm, keyStore, keyStoreType, keyStorePwd); - - TrustManager[] trustManagers = cfg.isSslTrustAll() ? - new TrustManager[] {ignoreErrorsTrustMgr} : - getTrustManagers(algorithm, trustStore, trustStoreType, trustStorePwd); - - try { - SSLContext sslCtx = SSLContext.getInstance(proto); - - sslCtx.init(keyManagers, trustManagers, null); - - return sslCtx.getSocketFactory(); - } - catch (NoSuchAlgorithmException e) { - throw new ClientError("SSL context cryptographic algorithm is not available", e); - } - catch (KeyManagementException e) { - throw new ClientError("Failed to create SSL Context", e); - } - } - - /** - * @return String representation of {@link SslProtocol} as required by {@link SSLContext}. - */ - private static String toString(SslProtocol proto) { - switch (proto) { - case TLSv1_1: - return "TLSv1.1"; - - case TLSv1_2: - return "TLSv1.2"; - - default: - return proto.toString(); - } - } - - /** */ - private static KeyManager[] getKeyManagers( - String algorithm, - String keyStore, - String keyStoreType, - String keyStorePwd - ) { - KeyManagerFactory keyMgrFactory; - - try { - keyMgrFactory = KeyManagerFactory.getInstance(algorithm); - } - catch (NoSuchAlgorithmException e) { - throw new ClientError("Key manager cryptographic algorithm is not available", e); - } - - Predicate<String> empty = s -> s == null || s.isEmpty(); - - if (!empty.test(keyStore) && !empty.test(keyStoreType)) { - char[] pwd = (keyStorePwd == null) ? new char[0] : keyStorePwd.toCharArray(); - - KeyStore store = loadKeyStore("Client", keyStore, keyStoreType, pwd); - - try { - keyMgrFactory.init(store, pwd); - } - catch (UnrecoverableKeyException e) { - throw new ClientError("Could not recover key store key", e); - } - catch (KeyStoreException e) { - throw new ClientError( - String.format("Client key store provider of type [%s] is not available", keyStoreType), - e - ); - } - catch (NoSuchAlgorithmException e) { - throw new ClientError("Client key store integrity check algorithm is not available", e); - } - } - - return keyMgrFactory.getKeyManagers(); - } - - /** */ - private static TrustManager[] getTrustManagers( - String algorithm, - String trustStore, - String trustStoreType, - String trustStorePwd - ) { - TrustManagerFactory trustMgrFactory; - - try { - trustMgrFactory = TrustManagerFactory.getInstance(algorithm); - } - catch (NoSuchAlgorithmException e) { - throw new ClientError("Trust manager cryptographic algorithm is not available", e); - } - - Predicate<String> empty = s -> s == null || s.isEmpty(); - - if (!empty.test(trustStore) && !empty.test(trustStoreType)) { - char[] pwd = (trustStorePwd == null) ? new char[0] : trustStorePwd.toCharArray(); - - KeyStore store = loadKeyStore("Trust", trustStore, trustStoreType, pwd); - - try { - trustMgrFactory.init(store); - } - catch (KeyStoreException e) { - throw new ClientError( - String.format("Trust key store provider of type [%s] is not available", trustStoreType), - e - ); - } - } - - return trustMgrFactory.getTrustManagers(); - } - - /** */ - private static KeyStore loadKeyStore(String lb, String path, String type, char[] pwd) { - KeyStore store; - - try { - store = KeyStore.getInstance(type); - } - catch (KeyStoreException e) { - throw new ClientError( - String.format("%s key store provider of type [%s] is not available", lb, type), - e - ); - } - - try (InputStream in = new FileInputStream(new File(path))) { - - store.load(in, pwd); - - return store; - } - catch (FileNotFoundException e) { - throw new ClientError(String.format("%s key store file [%s] does not exist", lb, path), e); - } - catch (NoSuchAlgorithmException e) { - throw new ClientError( - String.format("%s key store integrity check algorithm is not available", lb), - e - ); - } - catch (CertificateException e) { - throw new ClientError(String.format("Could not load certificate from %s key store", lb), e); - } - catch (IOException e) { - throw new ClientError(String.format("Could not read %s key store", lb), e); - } - } + private static class ClientRequestFuture extends GridFutureAdapter<ByteBuffer> { } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java index 9cea6a4..c67184a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java @@ -24,8 +24,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; import java.util.function.Consumer; -import java.util.function.Function; import org.apache.ignite.IgniteBinary; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.binary.BinaryObjectException; @@ -55,6 +55,7 @@ import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; +import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.MarshallerContext; @@ -101,8 +102,8 @@ public class TcpIgniteClient implements IgniteClient { * Constructor with custom channel factory. */ TcpIgniteClient( - Function<ClientChannelConfiguration, ClientChannel> chFactory, - ClientConfiguration cfg + BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory, + ClientConfiguration cfg ) throws ClientException { final ClientBinaryMetadataHandler metadataHandler = new ClientBinaryMetadataHandler(); @@ -116,18 +117,24 @@ public class TcpIgniteClient implements IgniteClient { ch = new ReliableChannel(chFactory, cfg, binary); - ch.channelsInit(); + try { + ch.channelsInit(); - ch.addChannelFailListener(() -> metadataHandler.onReconnect()); + ch.addChannelFailListener(() -> metadataHandler.onReconnect()); - transactions = new TcpClientTransactions(ch, marsh, - new ClientTransactionConfiguration(cfg.getTransactionConfiguration())); + transactions = new TcpClientTransactions(ch, marsh, + new ClientTransactionConfiguration(cfg.getTransactionConfiguration())); - cluster = new ClientClusterImpl(ch, marsh); + cluster = new ClientClusterImpl(ch, marsh); - compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup()); + compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup()); - services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup()); + services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup()); + } + catch (Exception e) { + ch.close(); + throw e; + } } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java similarity index 62% copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java index ae1b7fa..eed90b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java @@ -15,20 +15,25 @@ * limitations under the License. */ -package org.apache.ignite.internal.client.thin; +package org.apache.ignite.internal.client.thin.io; + +import java.nio.ByteBuffer; + +import org.apache.ignite.IgniteCheckedException; /** - * Server to client notification listener. + * Client connection: abstracts away sending and receiving messages. */ -interface NotificationListener { +public interface ClientConnection extends AutoCloseable { /** - * Accept notification. + * Sends a message. * - * @param ch Client channel which was notified. - * @param op Client operation. - * @param rsrcId Resource id. - * @param payload Notification payload or {@code null} if there is no payload. - * @param err Error. + * @param msg Message buffer. + */ + void send(ByteBuffer msg) throws IgniteCheckedException; + + /** + * Closes the connection. */ - public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err); + @Override void close(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java new file mode 100644 index 0000000..891e2b3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin.io; + +import java.net.InetSocketAddress; + +import org.apache.ignite.client.ClientConnectionException; + +/** + * Client connection multiplexer: manages multiple connections with a shared resource pool (worker threads, etc). + */ +public interface ClientConnectionMultiplexer { + /** + * Initializes this instance. + */ + void start(); + + /** + * Stops this instance. + */ + void stop(); + + /** + * Opens a new connection. + * + * @param addr Address. + * @param msgHnd Incoming message handler. + * @param stateHnd Connection state handler. + * @return Created connection. + * @throws ClientConnectionException when connection can't be established. + */ + ClientConnection open( + InetSocketAddress addr, + ClientMessageHandler msgHnd, + ClientConnectionStateHandler stateHnd) + throws ClientConnectionException; +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java similarity index 62% copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java index ae1b7fa..3f9481e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java @@ -15,20 +15,17 @@ * limitations under the License. */ -package org.apache.ignite.internal.client.thin; +package org.apache.ignite.internal.client.thin.io; + +import org.jetbrains.annotations.Nullable; /** - * Server to client notification listener. + * Handles thin client connection state. */ -interface NotificationListener { +public interface ClientConnectionStateHandler { /** - * Accept notification. - * - * @param ch Client channel which was notified. - * @param op Client operation. - * @param rsrcId Resource id. - * @param payload Notification payload or {@code null} if there is no payload. - * @param err Error. + * Handles connection loss. + * @param e Exception that caused the disconnect, can be null. */ - public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err); + void onDisconnected(@Nullable Exception e); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java new file mode 100644 index 0000000..06ab441 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin.io; + +import java.nio.ByteBuffer; + +/** + * Decodes thin client messages from partial buffers. + */ +public class ClientMessageDecoder { + /** */ + private byte[] data; + + /** */ + private int cnt = -4; + + /** */ + private int msgSize; + + /** + * Applies the next partial buffer. + * + * @param buf Buffer. + * @return Decoded message, or null when not yet complete. + */ + public byte[] apply(ByteBuffer buf) { + boolean msgReady = read(buf); + + return msgReady ? data : null; + } + + /** + * Reads the buffer. + * + * @param buf Buffer. + * @return True when a complete message has been received; false otherwise. + */ + @SuppressWarnings("DuplicatedCode") // A little duplication is better than a little dependency. + private boolean read(ByteBuffer buf) { + if (cnt < 0) { + for (; cnt < 0 && buf.hasRemaining(); cnt++) + msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt)); + + if (cnt < 0) + return false; + + data = new byte[msgSize]; + } + + assert data != null; + assert cnt >= 0; + assert msgSize > 0; + + int remaining = buf.remaining(); + + if (remaining > 0) { + int missing = msgSize - cnt; + + if (missing > 0) { + int len = Math.min(missing, remaining); + + buf.get(data, cnt, len); + + cnt += len; + } + } + + if (cnt == msgSize) { + cnt = -4; + msgSize = 0; + + return true; + } + + return false; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java similarity index 62% copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java index ae1b7fa..a52859f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java @@ -15,20 +15,17 @@ * limitations under the License. */ -package org.apache.ignite.internal.client.thin; +package org.apache.ignite.internal.client.thin.io; + +import java.nio.ByteBuffer; /** - * Server to client notification listener. + * Handles thin client responses and server -> client notifications. */ -interface NotificationListener { +public interface ClientMessageHandler { /** - * Accept notification. - * - * @param ch Client channel which was notified. - * @param op Client operation. - * @param rsrcId Resource id. - * @param payload Notification payload or {@code null} if there is no payload. - * @param err Error. + * Handles messages from the server. + * @param buf Buffer. */ - public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err); + void onMessage(ByteBuffer buf); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java new file mode 100644 index 0000000..e81d6f4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin.io.gridnioserver; + +import java.nio.ByteBuffer; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.client.thin.io.ClientConnection; +import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler; +import org.apache.ignite.internal.client.thin.io.ClientMessageHandler; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; + +/** + * Client connection. + */ +class GridNioClientConnection implements ClientConnection { + /** */ + static final int SES_META_CONN = GridNioSessionMetaKey.nextUniqueKey(); + + /** */ + private final GridNioSession ses; + + /** */ + private final ClientMessageHandler msgHnd; + + /** */ + private final ClientConnectionStateHandler stateHnd; + + /** + * Ctor. + * + * @param ses Session. + */ + public GridNioClientConnection(GridNioSession ses, + ClientMessageHandler msgHnd, + ClientConnectionStateHandler stateHnd) { + assert ses != null; + assert msgHnd != null; + assert stateHnd != null; + + this.ses = ses; + this.msgHnd = msgHnd; + this.stateHnd = stateHnd; + + ses.addMeta(SES_META_CONN, this); + } + + /** {@inheritDoc} */ + @Override public void send(ByteBuffer msg) throws IgniteCheckedException { + ses.sendNoFuture(msg, null); + } + + /** {@inheritDoc} */ + @Override public void close() { + ses.close(); + } + + /** + * Handles incoming message. + * + * @param msg Message. + */ + void onMessage(ByteBuffer msg) { + assert msg != null; + + msgHnd.onMessage(msg); + } + + /** + * Handles disconnect. + * + * @param e Exception that caused the disconnect. + */ + void onDisconnected(Exception e) { + stateHnd.onDisconnected(e); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java new file mode 100644 index 0000000..74a7025 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin.io.gridnioserver; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; +import javax.net.ssl.SSLContext; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.client.ClientConnectionException; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.internal.client.thin.ClientSslUtils; +import org.apache.ignite.internal.client.thin.io.ClientConnection; +import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; +import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler; +import org.apache.ignite.internal.client.thin.io.ClientMessageHandler; +import org.apache.ignite.internal.util.nio.GridNioCodecFilter; +import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioFuture; +import org.apache.ignite.internal.util.nio.GridNioFutureImpl; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter; +import org.apache.ignite.logger.NullLogger; + +/** + * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}. + */ +public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer { + /** Worker thread prefix. */ + private static final String THREAD_PREFIX = "thin-client-channel"; + + /** */ + private static final int CLIENT_MODE_PORT = -1; + + /** */ + private final GridNioServer<ByteBuffer> srv; + + /** */ + private final SSLContext sslCtx; + + /** + * Constructor. + * + * @param cfg Client config. + */ + public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) { + IgniteLogger gridLog = new NullLogger(); + + GridNioFilter[] filters; + + GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false); + + sslCtx = ClientSslUtils.getSslContext(cfg); + + if (sslCtx != null) { + GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog); + sslFilter.directMode(false); + filters = new GridNioFilter[] {codecFilter, sslFilter}; + } + else + filters = new GridNioFilter[] {codecFilter}; + + try { + srv = GridNioServer.<ByteBuffer>builder() + .port(CLIENT_MODE_PORT) + .listener(new GridNioClientListener()) + .filters(filters) + .logger(gridLog) + .selectorCount(1) // Using more selectors does not seem to improve performance. + .byteOrder(ByteOrder.nativeOrder()) + .directBuffer(true) + .directMode(false) + .igniteInstanceName("thinClient") + .serverName(THREAD_PREFIX) + .idleTimeout(Long.MAX_VALUE) + .socketReceiveBufferSize(cfg.getReceiveBufferSize()) + .socketSendBufferSize(cfg.getSendBufferSize()) + .tcpNoDelay(true) + .build(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public void start() { + srv.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() { + srv.stop(); + } + + /** {@inheritDoc} */ + @Override public ClientConnection open(InetSocketAddress addr, + ClientMessageHandler msgHnd, + ClientConnectionStateHandler stateHnd) + throws ClientConnectionException { + try { + SocketChannel ch = SocketChannel.open(); + ch.socket().connect(new InetSocketAddress(addr.getHostName(), addr.getPort()), Integer.MAX_VALUE); + + Map<Integer, Object> meta = new HashMap<>(); + GridNioFuture<?> sslHandshakeFut = null; + + if (sslCtx != null) { + sslHandshakeFut = new GridNioFutureImpl<>(null); + + meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut); + } + + GridNioSession ses = srv.createSession(ch, meta, false, null).get(); + + if (sslHandshakeFut != null) + sslHandshakeFut.get(); + + return new GridNioClientConnection(ses, msgHnd, stateHnd); + } + catch (Exception e) { + throw new ClientConnectionException(e.getMessage(), e); + } + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java new file mode 100644 index 0000000..f33835d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin.io.gridnioserver; + +import java.nio.ByteBuffer; + +import org.apache.ignite.failure.FailureType; +import org.apache.ignite.internal.util.nio.GridNioServerListener; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.jetbrains.annotations.Nullable; + +/** + * Client event listener. + */ +class GridNioClientListener implements GridNioServerListener<ByteBuffer> { + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + GridNioClientConnection conn = ses.meta(GridNioClientConnection.SES_META_CONN); + + // Conn can be null when connection fails during initialization in open method. + if (conn != null) + conn.onDisconnected(e); + } + + /** {@inheritDoc} */ + @Override public void onMessageSent(GridNioSession ses, ByteBuffer msg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, ByteBuffer msg) { + GridNioClientConnection conn = ses.meta(GridNioClientConnection.SES_META_CONN); + + assert conn != null : "Session must have an associated connection"; + + conn.onMessage(msg); + } + + /** {@inheritDoc} */ + @Override public void onSessionWriteTimeout(GridNioSession ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onSessionIdleTimeout(GridNioSession ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onFailure(FailureType failureType, Throwable failure) { + // No-op. + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java new file mode 100644 index 0000000..439c78a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin.io.gridnioserver; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder; +import org.apache.ignite.internal.util.nio.GridNioParser; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; +import org.jetbrains.annotations.Nullable; + +/** + * Client message parser. + */ +class GridNioClientParser implements GridNioParser { + /** */ + private static final int SES_META_DECODER = GridNioSessionMetaKey.nextUniqueKey(); + + /** {@inheritDoc} */ + @Override public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) { + ClientMessageDecoder decoder = ses.meta(SES_META_DECODER); + + if (decoder == null) { + decoder = new ClientMessageDecoder(); + + ses.addMeta(SES_META_DECODER, decoder); + } + + byte[] bytes = decoder.apply(buf); + + if (bytes == null) + return null; // Message is not yet completely received. + + // Thin client protocol is little-endian. ByteBuffer will handle conversion as necessary on big-endian systems. + return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) { + return (ByteBuffer)msg; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java b/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java index bbb2c87..2d75d5c 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java @@ -71,14 +71,20 @@ public class ConnectToStartingNodeTest extends AbstractThinClientTest { IgniteInternalFuture<IgniteClient> futStartClient = GridTestUtils.runAsync( () -> startClient(grid())); - // Server doesn't accept connection before discovery SPI started. - assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L)); + try { + // Server doesn't accept connection before discovery SPI started. + assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L)); - barrier.await(); + barrier.await(); - futStartGrid.get(); + futStartGrid.get(); - // Server accept connection after discovery SPI started. - assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L)); + // Server accept connection after discovery SPI started. + assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L)); + } + finally { + if (futStartClient.isDone()) + futStartClient.get().close(); + } } } diff --git a/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java b/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java index 0f0791b..c6def06 100644 --- a/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java +++ b/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java @@ -288,7 +288,7 @@ public class SslParametersTest extends GridCommonAbstractTest { cipherSuites, protocols, ClientConnectionException.class, - "Ignite cluster is unavailable" + "SSL handshake failed" ); } @@ -307,7 +307,7 @@ public class SslParametersTest extends GridCommonAbstractTest { this.cipherSuites = F.isEmpty(cipherSuites) ? null : cipherSuites; this.protocols = F.isEmpty(protocols) ? null : protocols; - GridTestUtils.assertThrows( + GridTestUtils.assertThrowsAnyCause( null, new Callable<Object>() { @Override public Object call() { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java index 61adf66..686a193 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java @@ -26,6 +26,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -35,6 +36,7 @@ import org.apache.ignite.client.ClientAuthorizationException; import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientException; import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.testframework.GridTestUtils; @@ -51,7 +53,8 @@ import static org.mockito.Mockito.mock; */ public class ReliableChannelTest { /** Mock factory for creating new channels. */ - private final Function<ClientChannelConfiguration, ClientChannel> chFactory = cfg -> new TestClientChannel(); + private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory = + (cfg, hnd) -> new TestClientChannel(); /** */ private final String[] dfltAddrs = new String[]{"127.0.0.1:10800", "127.0.0.1:10801", "127.0.0.1:10802"}; @@ -259,7 +262,7 @@ public class ReliableChannelTest { .setAddresses(dfltAddrs) .setPartitionAwarenessEnabled(true); - ReliableChannel rc = new ReliableChannel(cfg -> new TestFailureClientChannel(), ccfg, null); + ReliableChannel rc = new ReliableChannel((cfg, hnd) -> new TestFailureClientChannel(), ccfg, null); rc.channelsInit(); } @@ -302,7 +305,7 @@ public class ReliableChannelTest { // Emulate cluster is down after TcpClientChannel#send operation. AtomicInteger step = new AtomicInteger(); - ReliableChannel rc = new ReliableChannel(cfg -> { + ReliableChannel rc = new ReliableChannel((cfg, hnd) -> { if (step.getAndIncrement() == 0) return new TestAsyncServiceFailureClientChannel(); else diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java index dd716d6..7eda71f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java @@ -36,6 +36,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; @@ -185,11 +186,11 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo * @param chIdxs Channels to wait for initialization. */ protected void initClient(ClientConfiguration clientCfg, int... chIdxs) throws IgniteInterruptedCheckedException { - client = new TcpIgniteClient(cfg -> { + client = new TcpIgniteClient((cfg, hnd) -> { try { log.info("Establishing connection to " + cfg.getAddress()); - TcpClientChannel ch = new TestTcpClientChannel(cfg); + TcpClientChannel ch = new TestTcpClientChannel(cfg, hnd); log.info("Channel initialized: " + ch); @@ -323,8 +324,8 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo /** * @param cfg Config. */ - public TestTcpClientChannel(ClientChannelConfiguration cfg) { - super(cfg); + public TestTcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer hnd) { + super(cfg, hnd); this.cfg = cfg; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java index 7dc6222..2909c4e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java @@ -23,13 +23,13 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.GridTestUtils; import org.junit.Test; -import static org.apache.ignite.internal.client.thin.ReliableChannel.ASYNC_RUNNER_THREAD_NAME; -import static org.apache.ignite.internal.client.thin.TcpClientChannel.RECEIVER_THREAD_PREFIX; - /** * Test resource releasing by thin client. */ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientAbstractPartitionAwarenessTest { + /** Worker thread prefix. */ + private static final String THREAD_PREFIX = "thin-client-channel"; + /** * Test that resources are correctly released after closing client with partition awareness. */ @@ -46,15 +46,13 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA assertFalse(channels[0].isClosed()); assertFalse(channels[1].isClosed()); - assertEquals(1, threadsCount(ASYNC_RUNNER_THREAD_NAME)); - assertEquals(2, threadsCount(RECEIVER_THREAD_PREFIX)); + assertEquals(1, threadsCount(THREAD_PREFIX)); client.close(); assertTrue(channels[0].isClosed()); assertTrue(channels[1].isClosed()); - assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(ASYNC_RUNNER_THREAD_NAME) == 0, 1_000L)); - assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(RECEIVER_THREAD_PREFIX) == 0, 1_000L)); + assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(THREAD_PREFIX) == 0, 1_000L)); } /** @@ -68,7 +66,7 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA for (long id : threadIds) { ThreadInfo info = U.getThreadMx().getThreadInfo(id); - if (info != null && info.getThreadState() != Thread.State.TERMINATED && info.getThreadName().startsWith(name)) + if (info != null && info.getThreadState() != Thread.State.TERMINATED && info.getThreadName().contains(name)) cnt++; }