This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b861813 IGNITE-11685 Java thin client: Handle multiple async requests in parallel - Fixes #6595. b861813 is described below commit b861813bad67ec2fd4a545975d47120da535ff23 Author: Aleksey Plekhanov <plehanov.a...@gmail.com> AuthorDate: Fri Jul 12 16:04:22 2019 +0300 IGNITE-11685 Java thin client: Handle multiple async requests in parallel - Fixes #6595. --- .../ignite/client/ClientConnectionException.java | 18 +- .../ignite/internal/client/thin/ClientChannel.java | 17 +- .../client/thin/ClientFieldsQueryPager.java | 7 +- .../internal/client/thin/ClientQueryPager.java | 7 +- .../ignite/internal/client/thin/ClientUtils.java | 11 +- .../internal/client/thin/GenericQueryPager.java | 42 ++-- .../client/thin/PayloadInputChannel.java} | 39 +-- .../internal/client/thin/PayloadOutputChannel.java | 62 +++++ .../internal/client/thin/ProtocolVersion.java | 9 + .../internal/client/thin/ReliableChannel.java | 121 ++++----- .../internal/client/thin/TcpClientCache.java | 98 +++++--- .../internal/client/thin/TcpClientChannel.java | 276 +++++++++++++++++---- .../internal/client/thin/TcpIgniteClient.java | 56 ++--- .../org/apache/ignite/client/AsyncChannelTest.java | 198 +++++++++++++++ .../org/apache/ignite/client/ClientTestSuite.java | 3 +- 15 files changed, 717 insertions(+), 247 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java b/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java index 1ec096c..58ca153 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java +++ b/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java @@ -24,22 +24,22 @@ public class ClientConnectionException extends ClientException { /** Serial version uid. */ private static final long serialVersionUID = 0L; - /** Message. */ - private static final String MSG = "Ignite cluster is unavailable"; - /** - * Default constructor. + * Constructs a new exception with the specified detail message. + * + * @param msg the detail message. */ - public ClientConnectionException() { - super(MSG); + public ClientConnectionException(String msg) { + super(msg); } /** - * Constructs a new exception with the specified cause. + * Constructs a new exception with the specified cause and detail message. * + * @param msg the detail message. * @param cause the cause. */ - public ClientConnectionException(Throwable cause) { - super(MSG, cause); + public ClientConnectionException(String msg, Throwable cause) { + super(msg, cause); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java index 9e97b34..de7062e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannel.java @@ -19,30 +19,23 @@ package org.apache.ignite.internal.client.thin; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.binary.streams.BinaryOutputStream; -import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientAuthorizationException; +import org.apache.ignite.client.ClientConnectionException; /** * Processing thin client requests and responses. */ interface ClientChannel extends AutoCloseable { /** + * Send request and handle response for client operation. + * * @param op Operation. * @param payloadWriter Payload writer to stream or {@code null} if request has no payload. - * @return Request ID. - */ - public long send(ClientOperation op, Consumer<BinaryOutputStream> payloadWriter) throws ClientConnectionException; - - /** - * @param op Operation. - * @param reqId ID of the request to receive the response for. * @param payloadReader Payload reader from stream. * @return Received operation payload or {@code null} if response has no payload. */ - public <T> T receive(ClientOperation op, long reqId, Function<BinaryInputStream, T> payloadReader) - throws ClientConnectionException, ClientAuthorizationException; + public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, + Function<PayloadInputChannel, T> payloadReader) throws ClientConnectionException, ClientAuthorizationException; /** * @return Server version. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java index 82a4ce7..f7c080f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientFieldsQueryPager.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.function.Consumer; import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.binary.streams.BinaryOutputStream; /** * Fields query pager. @@ -42,7 +41,7 @@ class ClientFieldsQueryPager extends GenericQueryPager<List<?>> implements Field ReliableChannel ch, ClientOperation qryOp, ClientOperation pageQryOp, - Consumer<BinaryOutputStream> qryWriter, + Consumer<PayloadOutputChannel> qryWriter, boolean keepBinary, ClientBinaryMarshaller marsh ) { @@ -54,7 +53,9 @@ class ClientFieldsQueryPager extends GenericQueryPager<List<?>> implements Field } /** {@inheritDoc} */ - @Override Collection<List<?>> readEntries(BinaryInputStream in) { + @Override Collection<List<?>> readEntries(PayloadInputChannel payloadCh) { + BinaryInputStream in = payloadCh.in(); + if (!hasFirstPage()) fieldNames = new ArrayList<>(ClientUtils.collection(in, ignored -> (String)serDes.readObject(in, keepBinary))); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java index 1a373b4..995bdaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientQueryPager.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.function.Consumer; import javax.cache.Cache; import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.binary.streams.BinaryOutputStream; /** * Client query pager. @@ -38,7 +37,7 @@ class ClientQueryPager<K, V> extends GenericQueryPager<Cache.Entry<K, V>> { ReliableChannel ch, ClientOperation qryOp, ClientOperation pageQryOp, - Consumer<BinaryOutputStream> qryWriter, + Consumer<PayloadOutputChannel> qryWriter, boolean keepBinary, ClientBinaryMarshaller marsh ) { @@ -50,7 +49,9 @@ class ClientQueryPager<K, V> extends GenericQueryPager<Cache.Entry<K, V>> { } /** {@inheritDoc} */ - @Override Collection<Cache.Entry<K, V>> readEntries(BinaryInputStream in) { + @Override Collection<Cache.Entry<K, V>> readEntries(PayloadInputChannel paloadCh) { + BinaryInputStream in = paloadCh.in(); + return ClientUtils.collection( in, ignored -> new ClientCacheEntry<>(serDes.readObject(in, keepBinary), serDes.readObject(in, keepBinary)) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java index 8e2d61a..3175cfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientUtils.java @@ -55,9 +55,8 @@ import org.apache.ignite.internal.binary.BinarySchema; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; -import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; -import static org.apache.ignite.internal.processors.platform.client.ClientConnectionContext.VER_1_2_0; +import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_2_0; /** * Shared serialization/deserialization utils. @@ -234,7 +233,7 @@ final class ClientUtils { } /** Serialize configuration to stream. */ - void cacheConfiguration(ClientCacheConfiguration cfg, BinaryOutputStream out, ClientListenerProtocolVersion ver) { + void cacheConfiguration(ClientCacheConfiguration cfg, BinaryOutputStream out, ProtocolVersion ver) { try (BinaryRawWriterEx writer = new BinaryWriterExImpl(marsh.context(), out, null, null)) { int origPos = out.position(); @@ -313,7 +312,7 @@ final class ClientUtils { w.writeBoolean(qf.isNotNull()); w.writeObject(qf.getDefaultValue()); - if (ver.compareTo(VER_1_2_0) >= 0) { + if (ver.compareTo(V1_2_0) >= 0) { w.writeInt(qf.getPrecision()); w.writeInt(qf.getScale()); } @@ -349,7 +348,7 @@ final class ClientUtils { } /** Deserialize configuration from stream. */ - ClientCacheConfiguration cacheConfiguration(BinaryInputStream in, ClientListenerProtocolVersion ver) + ClientCacheConfiguration cacheConfiguration(BinaryInputStream in, ProtocolVersion ver) throws IOException { try (BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), in, null, true)) { reader.readInt(); // Do not need length to read data. The protocol defines fixed configuration layout. @@ -394,7 +393,7 @@ final class ClientUtils { .setKeyFieldName(reader.readString()) .setValueFieldName(reader.readString()); - boolean isCliVer1_2 = ver.compareTo(VER_1_2_0) >= 0; + boolean isCliVer1_2 = ver.compareTo(V1_2_0) >= 0; Collection<QueryField> qryFields = ClientUtils.collection( in, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java index 30c73cf..90ab568 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/GenericQueryPager.java @@ -21,12 +21,9 @@ import java.util.Collection; import java.util.function.Consumer; import org.apache.ignite.client.ClientException; import org.apache.ignite.client.ClientReconnectedException; -import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.binary.streams.BinaryOutputStream; -import org.apache.ignite.internal.processors.platform.client.ClientStatus; /** - * Generic query pager. Override {@link this#readResult(BinaryInputStream)} to make it specific. + * Generic query pager. Override {@link this#readResult(PayloadInputChannel)} to make it specific. */ abstract class GenericQueryPager<T> implements QueryPager<T> { /** Query op. */ @@ -36,7 +33,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> { private final ClientOperation pageQryOp; /** Query writer. */ - private final Consumer<BinaryOutputStream> qryWriter; + private final Consumer<PayloadOutputChannel> qryWriter; /** Channel. */ private final ReliableChannel ch; @@ -50,12 +47,15 @@ abstract class GenericQueryPager<T> implements QueryPager<T> { /** Cursor id. */ private Long cursorId = null; + /** Client channel on first query page. */ + private ClientChannel clientCh; + /** Constructor. */ GenericQueryPager( ReliableChannel ch, ClientOperation qryOp, ClientOperation pageQryOp, - Consumer<BinaryOutputStream> qryWriter + Consumer<PayloadOutputChannel> qryWriter ) { this.ch = ch; this.qryOp = qryOp; @@ -75,7 +75,7 @@ abstract class GenericQueryPager<T> implements QueryPager<T> { @Override public void close() throws Exception { // Close cursor only if the server has more pages: the server closes cursor automatically on last page if (cursorId != null && hasNext) - ch.request(ClientOperation.RESOURCE_CLOSE, req -> req.writeLong(cursorId)); + ch.request(ClientOperation.RESOURCE_CLOSE, req -> req.out().writeLong(cursorId)); } /** {@inheritDoc} */ @@ -95,6 +95,8 @@ abstract class GenericQueryPager<T> implements QueryPager<T> { hasNext = true; cursorId = null; + + clientCh = null; } /** @@ -102,12 +104,12 @@ abstract class GenericQueryPager<T> implements QueryPager<T> { * cursor ID and trailing "has next page" flag. * Use {@link this#hasFirstPage} flag to differentiate between the initial query and page query responses. */ - abstract Collection<T> readEntries(BinaryInputStream in); + abstract Collection<T> readEntries(PayloadInputChannel in); /** */ - private Collection<T> readResult(BinaryInputStream in) { + private Collection<T> readResult(PayloadInputChannel payloadCh) { if (!hasFirstPage) { - long resCursorId = in.readLong(); + long resCursorId = payloadCh.in().readLong(); if (cursorId != null) { if (cursorId != resCursorId) @@ -115,13 +117,16 @@ abstract class GenericQueryPager<T> implements QueryPager<T> { String.format("Expected cursor [%s] but received cursor [%s]", cursorId, resCursorId) ); } - else + else { cursorId = resCursorId; + + clientCh = payloadCh.clientChannel(); + } } - Collection<T> res = readEntries(in); + Collection<T> res = readEntries(payloadCh); - hasNext = in.readBoolean(); + hasNext = payloadCh.in().readBoolean(); hasFirstPage = true; @@ -130,16 +135,13 @@ abstract class GenericQueryPager<T> implements QueryPager<T> { /** Get page. */ private Collection<T> queryPage() throws ClientException { - try { - return ch.service(pageQryOp, req -> req.writeLong(cursorId), this::readResult); - } - catch (ClientServerError ex) { - if (ex.getCode() == ClientStatus.RESOURCE_DOES_NOT_EXIST) { + return ch.service(pageQryOp, req -> { + if (clientCh != req.clientChannel()) { throw new ClientReconnectedException("Client was reconnected in the middle of results fetch, " + "query results can be inconsistent, please retry the query."); } - throw ex; - } + req.out().writeLong(cursorId); + }, this::readResult); } } diff --git a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java similarity index 54% copy from modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java index 1ec096c..76af7f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/client/ClientConnectionException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java @@ -15,31 +15,40 @@ * limitations under the License. */ -package org.apache.ignite.client; +package org.apache.ignite.internal.client.thin; + +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; /** - * Indicates all the Ignite servers specified in the client configuration are no longer available. + * Thin client payload input channel. */ -public class ClientConnectionException extends ClientException { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; +class PayloadInputChannel { + /** Client channel. */ + private final ClientChannel ch; + + /** Input stream. */ + private final BinaryInputStream in; - /** Message. */ - private static final String MSG = "Ignite cluster is unavailable"; + /** + * Constructor. + */ + PayloadInputChannel(ClientChannel ch, byte[] payload) { + in = new BinaryHeapInputStream(payload); + this.ch = ch; + } /** - * Default constructor. + * Gets client channel. */ - public ClientConnectionException() { - super(MSG); + public ClientChannel clientChannel() { + return ch; } /** - * Constructs a new exception with the specified cause. - * - * @param cause the cause. + * Gets input stream. */ - public ClientConnectionException(Throwable cause) { - super(MSG, cause); + public BinaryInputStream in() { + return in; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadOutputChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadOutputChannel.java new file mode 100644 index 0000000..cd2b29c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadOutputChannel.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.binary.streams.BinaryOutputStream; + +/** + * Thin client payload output channel. + */ +class PayloadOutputChannel implements AutoCloseable { + /** Initial output stream buffer capacity. */ + private static final int INITIAL_BUFFER_CAPACITY = 1024; + + /** Client channel. */ + private final ClientChannel ch; + + /** Output stream. */ + private final BinaryOutputStream out; + + /** + * Constructor. + */ + PayloadOutputChannel(ClientChannel ch) { + out = new BinaryHeapOutputStream(INITIAL_BUFFER_CAPACITY); + this.ch = ch; + } + + /** + * Gets client channel. + */ + public ClientChannel clientChannel() { + return ch; + } + + /** + * Gets output stream. + */ + public BinaryOutputStream out() { + return out; + } + + /** {@inheritDoc} */ + @Override public void close() { + out.close(); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java index 2e84e36..aaf7eed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java @@ -19,6 +19,15 @@ package org.apache.ignite.internal.client.thin; /** Thin client protocol version. */ public final class ProtocolVersion implements Comparable<ProtocolVersion> { + /** Protocol version: 1.2.0. */ + public static final ProtocolVersion V1_2_0 = new ProtocolVersion((short)1, (short)2, (short)0); + + /** Protocol version: 1.1.0. */ + public static final ProtocolVersion V1_1_0 = new ProtocolVersion((short)1, (short)1, (short)0); + + /** Protocol version 1.0.0. */ + public static final ProtocolVersion V1_0_0 = new ProtocolVersion((short)1, (short)0, (short)0); + /** Major. */ private final short major; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java index 8edcc16..2e8deef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java @@ -24,8 +24,6 @@ import java.util.Deque; import java.util.LinkedList; import java.util.List; import java.util.Random; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -35,10 +33,9 @@ import org.apache.ignite.client.ClientConnectionException; import org.apache.ignite.client.ClientException; import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; -import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.binary.streams.BinaryOutputStream; import org.apache.ignite.internal.util.HostAndPortRange; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Adds failover abd thread-safety to {@link ClientChannel}. @@ -47,8 +44,8 @@ final class ReliableChannel implements AutoCloseable { /** Raw channel. */ private final Function<ClientChannelConfiguration, Result<ClientChannel>> chFactory; - /** Service lock. */ - private final Lock svcLock = new ReentrantLock(); + /** Servers count. */ + private final int srvCnt; /** Primary server. */ private InetSocketAddress primary; @@ -57,11 +54,14 @@ final class ReliableChannel implements AutoCloseable { private final Deque<InetSocketAddress> backups = new LinkedList<>(); /** Channel. */ - private ClientChannel ch = null; + private ClientChannel ch; /** Ignite config. */ private final ClientConfiguration clientCfg; + /** Channel is closed. */ + private boolean closed; + /** * Constructor. */ @@ -80,11 +80,13 @@ final class ReliableChannel implements AutoCloseable { List<InetSocketAddress> addrs = parseAddresses(clientCfg.getAddresses()); + srvCnt = addrs.size(); + primary = addrs.get(new Random().nextInt(addrs.size())); // we already verified there is at least one address for (InetSocketAddress a : addrs) { if (a != primary) - this.backups.add(a); + backups.add(a); } ClientConnectionException lastEx = null; @@ -97,7 +99,7 @@ final class ReliableChannel implements AutoCloseable { } catch (ClientConnectionException e) { lastEx = e; - changeServer(); + rollAddress(); } } @@ -105,7 +107,9 @@ final class ReliableChannel implements AutoCloseable { } /** {@inheritDoc} */ - @Override public void close() throws Exception { + @Override public synchronized void close() throws Exception { + closed = true; + if (ch != null) { ch.close(); @@ -114,58 +118,40 @@ final class ReliableChannel implements AutoCloseable { } /** - * Send request and handle response. The method is synchronous and single-threaded. + * Send request and handle response. */ public <T> T service( ClientOperation op, - Consumer<BinaryOutputStream> payloadWriter, - Function<BinaryInputStream, T> payloadReader + Consumer<PayloadOutputChannel> payloadWriter, + Function<PayloadInputChannel, T> payloadReader ) throws ClientException { ClientConnectionException failure = null; - T res = null; - - int totalSrvs = 1 + backups.size(); - - svcLock.lock(); - try { - for (int i = 0; i < totalSrvs; i++) { - try { - if (ch == null) - ch = chFactory.apply(new ClientChannelConfiguration(clientCfg).setAddress(primary)).get(); + for (int i = 0; i < srvCnt; i++) { + ClientChannel ch = null; - long id = ch.send(op, payloadWriter); - - res = ch.receive(op, id, payloadReader); - - failure = null; + try { + ch = channel(); - break; - } - catch (ClientConnectionException e) { - if (failure == null) - failure = e; - else - failure.addSuppressed(e); + return ch.service(op, payloadWriter, payloadReader); + } + catch (ClientConnectionException e) { + if (failure == null) + failure = e; + else + failure.addSuppressed(e); - changeServer(); - } + changeServer(ch); } } - finally { - svcLock.unlock(); - } - - if (failure != null) - throw failure; - return res; + throw failure; } /** * Send request without payload and handle response. */ - public <T> T service(ClientOperation op, Function<BinaryInputStream, T> payloadReader) + public <T> T service(ClientOperation op, Function<PayloadInputChannel, T> payloadReader) throws ClientException { return service(op, null, payloadReader); } @@ -173,18 +159,11 @@ final class ReliableChannel implements AutoCloseable { /** * Send request and handle response without payload. */ - public void request(ClientOperation op, Consumer<BinaryOutputStream> payloadWriter) throws ClientException { + public void request(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter) throws ClientException { service(op, payloadWriter, null); } /** - * @return Server version. - */ - public ProtocolVersion serverVersion() { - return ch.serverVersion(); - } - - /** * @return host:port_range address lines parsed as {@link InetSocketAddress}. */ private static List<InetSocketAddress> parseAddresses(String[] addrs) throws ClientException { @@ -216,19 +195,41 @@ final class ReliableChannel implements AutoCloseable { } /** */ - private void changeServer() { + private synchronized ClientChannel channel() { + if (closed) + throw new ClientException("Channel is closed"); + + if (ch == null) { + try { + ch = chFactory.apply(new ClientChannelConfiguration(clientCfg).setAddress(primary)).get(); + } + catch (ClientConnectionException e) { + rollAddress(); + + throw e; + } + } + + return ch; + } + + /** */ + private void rollAddress() { if (!backups.isEmpty()) { backups.addLast(primary); primary = backups.removeFirst(); } + } - try { - ch.close(); - } - catch (Exception ignored) { - } + /** */ + private synchronized void changeServer(ClientChannel oldCh) { + if (oldCh == ch && ch != null) { + rollAddress(); + + U.closeQuiet(ch); - ch = null; + ch = null; + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java index 3e3204c..5771b72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java @@ -40,7 +40,6 @@ import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; import static java.util.AbstractMap.SimpleEntry; -import static org.apache.ignite.internal.processors.platform.client.ClientConnectionContext.DEFAULT_VER; /** * Implementation of {@link ClientCache} over TCP protocol. @@ -83,7 +82,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_GET, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); + writeObject(req, key); }, this::readObject ); @@ -101,8 +100,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_PUT, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); - serDes.writeObject(req, val); + writeObject(req, key); + writeObject(req, val); } ); } @@ -116,9 +115,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_CONTAINS_KEY, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); + writeObject(req, key); }, - BinaryInputStream::readBoolean + res -> res.in().readBoolean() ); } @@ -134,7 +133,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { this::writeCacheInfo, res -> { try { - return serDes.cacheConfiguration(res, DEFAULT_VER); + return serDes.cacheConfiguration(res.in(), res.clientChannel().serverVersion()); } catch (IOException e) { return null; @@ -149,9 +148,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_GET_SIZE, req -> { writeCacheInfo(req); - ClientUtils.collection(peekModes, req, (out, m) -> out.writeByte((byte)m.ordinal())); + ClientUtils.collection(peekModes, req.out(), (out, m) -> out.writeByte((byte)m.ordinal())); }, - res -> (int)res.readLong() + res -> (int)res.in().readLong() ); } @@ -167,10 +166,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_GET_ALL, req -> { writeCacheInfo(req); - ClientUtils.collection(keys, req, serDes::writeObject); + ClientUtils.collection(keys, req.out(), serDes::writeObject); }, res -> ClientUtils.collection( - res, + res.in(), in -> new SimpleEntry<K, V>(readObject(in), readObject(in)) ) ).stream().collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); @@ -190,7 +189,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { writeCacheInfo(req); ClientUtils.collection( map.entrySet(), - req, + req.out(), (out, e) -> { serDes.writeObject(out, e.getKey()); serDes.writeObject(out, e.getValue()); @@ -214,11 +213,11 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_REPLACE_IF_EQUALS, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); - serDes.writeObject(req, oldVal); - serDes.writeObject(req, newVal); + writeObject(req, key); + writeObject(req, oldVal); + writeObject(req, newVal); }, - BinaryInputStream::readBoolean + res -> res.in().readBoolean() ); } @@ -234,10 +233,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_REPLACE, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); - serDes.writeObject(req, val); + writeObject(req, key); + writeObject(req, val); }, - BinaryInputStream::readBoolean + res -> res.in().readBoolean() ); } @@ -250,9 +249,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_REMOVE_KEY, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); + writeObject(req, key); }, - BinaryInputStream::readBoolean + res -> res.in().readBoolean() ); } @@ -268,10 +267,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_REMOVE_IF_EQUALS, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); - serDes.writeObject(req, oldVal); + writeObject(req, key); + writeObject(req, oldVal); }, - BinaryInputStream::readBoolean + res -> res.in().readBoolean() ); } @@ -287,7 +286,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_REMOVE_KEYS, req -> { writeCacheInfo(req); - ClientUtils.collection(keys, req, serDes::writeObject); + ClientUtils.collection(keys, req.out(), serDes::writeObject); } ); } @@ -309,8 +308,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_GET_AND_PUT, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); - serDes.writeObject(req, val); + writeObject(req, key); + writeObject(req, val); }, this::readObject ); @@ -325,7 +324,7 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_GET_AND_REMOVE, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); + writeObject(req, key); }, this::readObject ); @@ -343,8 +342,8 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_GET_AND_REPLACE, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); - serDes.writeObject(req, val); + writeObject(req, key); + writeObject(req, val); }, this::readObject ); @@ -362,10 +361,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { ClientOperation.CACHE_PUT_IF_ABSENT, req -> { writeCacheInfo(req); - serDes.writeObject(req, key); - serDes.writeObject(req, val); + writeObject(req, key); + writeObject(req, val); }, - BinaryInputStream::readBoolean + res -> res.in().readBoolean() ); } @@ -425,9 +424,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { if (qry == null) throw new NullPointerException("qry"); - Consumer<BinaryOutputStream> qryWriter = out -> { - writeCacheInfo(out); - serDes.write(qry, out); + Consumer<PayloadOutputChannel> qryWriter = payloadCh -> { + writeCacheInfo(payloadCh); + serDes.write(qry, payloadCh.out()); }; return new ClientFieldsQueryCursor<>(new ClientFieldsQueryPager( @@ -442,8 +441,10 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { /** Handle scan query. */ private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) { - Consumer<BinaryOutputStream> qryWriter = out -> { - writeCacheInfo(out); + Consumer<PayloadOutputChannel> qryWriter = payloadCh -> { + writeCacheInfo(payloadCh); + + BinaryOutputStream out = payloadCh.out(); if (qry.getFilter() == null) out.writeByte(GridBinaryMarshaller.NULL); @@ -469,8 +470,11 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { /** Handle SQL query. */ private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) { - Consumer<BinaryOutputStream> qryWriter = out -> { - writeCacheInfo(out); + Consumer<PayloadOutputChannel> qryWriter = payloadCh -> { + writeCacheInfo(payloadCh); + + BinaryOutputStream out = payloadCh.out(); + serDes.writeObject(out, qry.getType()); serDes.writeObject(out, qry.getSql()); ClientUtils.collection(qry.getArgs(), out, serDes::writeObject); @@ -492,7 +496,9 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { } /** Write cache ID and flags. */ - private void writeCacheInfo(BinaryOutputStream out) { + private void writeCacheInfo(PayloadOutputChannel payloadCh) { + BinaryOutputStream out = payloadCh.out(); + out.writeInt(cacheId); out.writeByte((byte)(keepBinary ? 1 : 0)); } @@ -501,4 +507,14 @@ class TcpClientCache<K, V> implements ClientCache<K, V> { private <T> T readObject(BinaryInputStream in) { return serDes.readObject(in, keepBinary); } + + /** */ + private <T> T readObject(PayloadInputChannel payloadCh) { + return readObject(payloadCh.in()); + } + + /** */ + private void writeObject(PayloadOutputChannel payloadCh, Object obj) { + serDes.writeObject(payloadCh.out(), obj); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java index d6097f2..66930e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.client.thin; +import java.io.DataInput; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -34,7 +35,11 @@ import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Arrays; import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -49,42 +54,45 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.client.ClientAuthenticationException; import org.apache.ignite.client.ClientAuthorizationException; import org.apache.ignite.client.ClientConnectionException; +import org.apache.ignite.client.ClientException; import org.apache.ignite.client.SslMode; import org.apache.ignite.client.SslProtocol; import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.binary.streams.BinaryOffheapOutputStream; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; import org.apache.ignite.internal.processors.platform.client.ClientStatus; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.io.GridUnsafeDataInput; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_0_0; +import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_1_0; +import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_2_0; /** * Implements {@link ClientChannel} over TCP. */ class TcpClientChannel implements ClientChannel { - /** Protocol version: 1.2.0. */ - private static final ProtocolVersion V1_2_0 = new ProtocolVersion((short)1, (short)2, (short)0); - - /** Protocol version: 1.1.0. */ - private static final ProtocolVersion V1_1_0 = new ProtocolVersion((short)1, (short)1, (short)0); - - /** Protocol version 1 0 0. */ - private static final ProtocolVersion V1_0_0 = new ProtocolVersion((short)1, (short)0, (short)0); - /** Supported protocol versions. */ private static final Collection<ProtocolVersion> supportedVers = Arrays.asList( - V1_2_0, + V1_2_0, V1_1_0, V1_0_0 ); + /** Timeout before next attempt to lock channel and process next response by current thread. */ + private static final long PAYLOAD_WAIT_TIMEOUT = 10L; + /** Protocol version agreed with the server. */ private ProtocolVersion ver = V1_2_0; @@ -97,9 +105,24 @@ class TcpClientChannel implements ClientChannel { /** Input stream. */ private final InputStream in; + /** Data input. */ + private final DataInput dataInput; + + /** Total bytes read by channel. */ + private long totalBytesRead; + /** Request id. */ private final AtomicLong reqId = new AtomicLong(1); + /** Send lock. */ + private final Lock sndLock = new ReentrantLock(); + + /** Receive lock. */ + private final Lock rcvLock = new ReentrantLock(); + + /** Pending requests. */ + private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>(); + /** Constructor. */ TcpClientChannel(ClientChannelConfiguration cfg) throws ClientConnectionException, ClientAuthenticationException { validateConfiguration(cfg); @@ -109,9 +132,13 @@ class TcpClientChannel implements ClientChannel { out = sock.getOutputStream(); in = sock.getInputStream(); + + GridUnsafeDataInput dis = new GridUnsafeDataInput(); + dis.inputStream(in); + dataInput = dis; } catch (IOException e) { - throw new ClientConnectionException(e); + throw handleIOError("addr=" + cfg.getAddress(), e); } handshake(cfg.getUserName(), cfg.getUserPassword()); @@ -122,71 +149,157 @@ class TcpClientChannel implements ClientChannel { in.close(); out.close(); sock.close(); + + for (ClientRequestFuture pendingReq : pendingReqs.values()) + pendingReq.onDone(new ClientConnectionException("Channel is closed")); } /** {@inheritDoc} */ - @Override public long send(ClientOperation op, Consumer<BinaryOutputStream> payloadWriter) + @Override public <T> T service(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter, + Function<PayloadInputChannel, T> payloadReader) throws ClientConnectionException, ClientAuthorizationException { + long id = send(op, payloadWriter); + + return receive(id, payloadReader); + } + + /** + * @param op Operation. + * @param payloadWriter Payload writer to stream or {@code null} if request has no payload. + * @return Request ID. + */ + private long send(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter) throws ClientConnectionException { long id = reqId.getAndIncrement(); - try (BinaryOutputStream req = new BinaryHeapOutputStream(1024)) { - req.writeInt(0); // reserve an integer for the request size + // Only one thread at a time can have access to write to the channel. + sndLock.lock(); + + try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) { + pendingReqs.put(id, new ClientRequestFuture()); + + BinaryOutputStream req = payloadCh.out(); + + req.writeInt(0); // Reserve an integer for the request size. req.writeShort(op.code()); req.writeLong(id); if (payloadWriter != null) - payloadWriter.accept(req); + payloadWriter.accept(payloadCh); - req.writeInt(0, req.position() - 4); // actual size + req.writeInt(0, req.position() - 4); // Actual size. write(req.array(), req.position()); } + catch (Throwable t) { + pendingReqs.remove(id); + + throw t; + } + finally { + sndLock.unlock(); + } return id; } - /** {@inheritDoc} */ - @Override public <T> T receive(ClientOperation op, long reqId, Function<BinaryInputStream, T> payloadReader) + /** + * @param reqId ID of the request to receive the response for. + * @param payloadReader Payload reader from stream. + * @return Received operation payload or {@code null} if response has no payload. + */ + private <T> T receive(long reqId, Function<PayloadInputChannel, T> payloadReader) throws ClientConnectionException, ClientAuthorizationException { + ClientRequestFuture pendingReq = pendingReqs.get(reqId); + + assert pendingReq != null : "Pending request future not found for request " + reqId; + + // Each thread creates a future on request sent and returns a response when this future is completed. + // Only one thread at a time can have access to read from the channel. This thread reads the next available + // response and complete corresponding future. All other concurrent threads wait for their own futures with + // a timeout and periodically try to lock the channel to process the next response. + try { + while (true) { + if (rcvLock.tryLock()) { + try { + if (!pendingReq.isDone()) + processNextResponse(); + } + finally { + rcvLock.unlock(); + } + } + + try { + byte[] payload = pendingReq.get(PAYLOAD_WAIT_TIMEOUT); + + if (payload == null || payloadReader == null) + return null; - final int MIN_RES_SIZE = 8 + 4; // minimal response size: long (8 bytes) ID + int (4 bytes) status + return payloadReader.apply(new PayloadInputChannel(this, payload)); + } + catch (IgniteFutureTimeoutCheckedException ignore) { + // Next cycle if timed out. + } + } + } + catch (IgniteCheckedException e) { + if (e.getCause() instanceof ClientError) + throw (ClientError)e.getCause(); + + if (e.getCause() instanceof ClientException) + throw (ClientException)e.getCause(); + + throw new ClientException(e.getMessage(), e); + } + finally { + pendingReqs.remove(reqId); + } + } - int resSize = new BinaryHeapInputStream(read(4)).readInt(); + /** + * Process next response from the input stream and complete corresponding future. + */ + private void processNextResponse() throws ClientProtocolError, ClientConnectionException { + int resSize = readInt(); - if (resSize < 0) + if (resSize <= 0) throw new ClientProtocolError(String.format("Invalid response size: %s", resSize)); - if (resSize == 0) - return null; + long bytesReadOnStartReq = totalBytesRead; + + long resId = readLong(); + + ClientRequestFuture pendingReq = pendingReqs.get(resId); - BinaryInputStream resIn = new BinaryHeapInputStream(read(MIN_RES_SIZE)); + if (pendingReq == null) + throw new ClientProtocolError(String.format("Unexpected response ID [%s]", resId)); - long resId = resIn.readLong(); + int status; - if (resId != reqId) - throw new ClientProtocolError(String.format("Unexpected response ID [%s], [%s] was expected", resId, reqId)); + BinaryInputStream resIn; - int status = resIn.readInt(); + status = readInt(); - if (status != 0) { - resIn = new BinaryHeapInputStream(read(resSize - MIN_RES_SIZE)); + int hdrSize = (int)(totalBytesRead - bytesReadOnStartReq); + + if (status == 0) { + if (resSize <= hdrSize) + pendingReq.onDone(); + else + pendingReq.onDone(read(resSize - hdrSize)); + } + else { + resIn = new BinaryHeapInputStream(read(resSize - hdrSize)); String err = new BinaryReaderExImpl(null, resIn, null, true).readString(); switch (status) { case ClientStatus.SECURITY_VIOLATION: - throw new ClientAuthorizationException(); + pendingReq.onDone(new ClientAuthorizationException()); default: - throw new ClientServerError(err, status, reqId); + pendingReq.onDone(new ClientServerError(err, status, resId)); } } - - if (resSize <= MIN_RES_SIZE || payloadReader == null) - return null; - - BinaryInputStream payload = new BinaryHeapInputStream(read(resSize - MIN_RES_SIZE)); - - return payloadReader.apply(payload); } /** {@inheritDoc} */ @@ -249,7 +362,7 @@ class TcpClientChannel implements ClientChannel { /** Send handshake request. */ private void handshakeReq(String user, String pwd) throws ClientConnectionException { - try (BinaryOutputStream req = new BinaryOffheapOutputStream(32)) { + try (BinaryOutputStream req = new BinaryHeapOutputStream(32)) { req.writeInt(0); // reserve an integer for the request size req.writeByte((byte)1); // handshake code, always 1 req.writeShort(ver.major()); @@ -271,7 +384,7 @@ class TcpClientChannel implements ClientChannel { /** Receive and handle handshake response. */ private void handshakeRes(String user, String pwd) throws ClientConnectionException, ClientAuthenticationException { - int resSize = new BinaryHeapInputStream(read(4)).readInt(); + int resSize = readInt(); if (resSize <= 0) throw new ClientProtocolError(String.format("Invalid handshake response size: %s", resSize)); @@ -310,7 +423,7 @@ class TcpClientChannel implements ClientChannel { } } catch (IOException e) { - throw new ClientConnectionException(e); + throw handleIOError(e); } } } @@ -326,18 +439,68 @@ class TcpClientChannel implements ClientChannel { bytesNum = in.read(bytes, readBytesNum, len - readBytesNum); } catch (IOException e) { - throw new ClientConnectionException(e); + throw handleIOError(e); } if (bytesNum < 0) - throw new ClientConnectionException(); + throw handleIOError(null); readBytesNum += bytesNum; } + totalBytesRead += readBytesNum; + return bytes; } + /** + * Read long value from input stream. + */ + private long readLong() { + try { + long val = dataInput.readLong(); + + totalBytesRead += Long.BYTES; + + return val; + } + catch (IOException e) { + throw handleIOError(e); + } + } + + /** + * Read int value from input stream. + */ + private int readInt() { + try { + int val = dataInput.readInt(); + + totalBytesRead += Integer.BYTES; + + return val; + } + catch (IOException e) { + throw handleIOError(e); + } + } + + /** + * Read short value from input stream. + */ + private short readShort() { + try { + short val = dataInput.readShort(); + + totalBytesRead += Short.BYTES; + + return val; + } + catch (IOException e) { + throw handleIOError(e); + } + } + /** Write bytes to the output stream. */ private void write(byte[] bytes, int len) throws ClientConnectionException { try { @@ -345,10 +508,31 @@ class TcpClientChannel implements ClientChannel { out.flush(); } catch (IOException e) { - throw new ClientConnectionException(e); + throw handleIOError(e); } } + /** + * @param ex IO exception (cause). + */ + private ClientException handleIOError(@Nullable IOException ex) { + return handleIOError("sock=" + sock, ex); + } + + /** + * @param chInfo Additional channel info + * @param ex IO exception (cause). + */ + private ClientException handleIOError(String chInfo, @Nullable IOException ex) { + return new ClientConnectionException("Ignite cluster is unavailable [" + chInfo + ']', ex); + } + + /** + * + */ + private static class ClientRequestFuture extends GridFutureAdapter<byte[]> { + } + /** SSL Socket Factory. */ private static class ClientSslSocketFactory { /** Trust manager ignoring all certificate checks. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java index 5c1275b..4dbcb93 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.binary.streams.BinaryOutputStream; -import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.marshaller.MarshallerContext; @@ -103,7 +102,7 @@ public class TcpIgniteClient implements IgniteClient { @Override public <K, V> ClientCache<K, V> getOrCreateCache(String name) throws ClientException { ensureCacheName(name); - ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req -> writeString(name, req)); + ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_NAME, req -> writeString(name, req.out())); return new TcpClientCache<>(name, ch, marsh); } @@ -114,7 +113,7 @@ public class TcpIgniteClient implements IgniteClient { ensureCacheConfiguration(cfg); ch.request(ClientOperation.CACHE_GET_OR_CREATE_WITH_CONFIGURATION, - req -> serDes.cacheConfiguration(cfg, req, toClientVersion(ch.serverVersion()))); + req -> serDes.cacheConfiguration(cfg, req.out(), req.clientChannel().serverVersion())); return new TcpClientCache<>(cfg.getName(), ch, marsh); } @@ -128,21 +127,21 @@ public class TcpIgniteClient implements IgniteClient { /** {@inheritDoc} */ @Override public Collection<String> cacheNames() throws ClientException { - return ch.service(ClientOperation.CACHE_GET_NAMES, res -> Arrays.asList(BinaryUtils.doReadStringArray(res))); + return ch.service(ClientOperation.CACHE_GET_NAMES, res -> Arrays.asList(BinaryUtils.doReadStringArray(res.in()))); } /** {@inheritDoc} */ @Override public void destroyCache(String name) throws ClientException { ensureCacheName(name); - ch.request(ClientOperation.CACHE_DESTROY, req -> req.writeInt(ClientUtils.cacheId(name))); + ch.request(ClientOperation.CACHE_DESTROY, req -> req.out().writeInt(ClientUtils.cacheId(name))); } /** {@inheritDoc} */ @Override public <K, V> ClientCache<K, V> createCache(String name) throws ClientException { ensureCacheName(name); - ch.request(ClientOperation.CACHE_CREATE_WITH_NAME, req -> writeString(name, req)); + ch.request(ClientOperation.CACHE_CREATE_WITH_NAME, req -> writeString(name, req.out())); return new TcpClientCache<>(name, ch, marsh); } @@ -152,21 +151,11 @@ public class TcpIgniteClient implements IgniteClient { ensureCacheConfiguration(cfg); ch.request(ClientOperation.CACHE_CREATE_WITH_CONFIGURATION, - req -> serDes.cacheConfiguration(cfg, req, toClientVersion(ch.serverVersion()))); + req -> serDes.cacheConfiguration(cfg, req.out(), req.clientChannel().serverVersion())); return new TcpClientCache<>(cfg.getName(), ch, marsh); } - /** - * Converts {@link ProtocolVersion} to {@link ClientListenerProtocolVersion}. - * - * @param srvVer Server protocol version. - * @return Client protocol version. - */ - private ClientListenerProtocolVersion toClientVersion(ProtocolVersion srvVer) { - return ClientListenerProtocolVersion.create(srvVer.major(), srvVer.minor(), srvVer.patch()); - } - /** {@inheritDoc} */ @Override public IgniteBinary binary() { return binary; @@ -177,7 +166,9 @@ public class TcpIgniteClient implements IgniteClient { if (qry == null) throw new NullPointerException("qry"); - Consumer<BinaryOutputStream> qryWriter = out -> { + Consumer<PayloadOutputChannel> qryWriter = payloadCh -> { + BinaryOutputStream out = payloadCh.out(); + out.writeInt(0); // no cache ID out.writeByte((byte)1); // keep binary serDes.write(qry, out); @@ -249,7 +240,7 @@ public class TcpIgniteClient implements IgniteClient { try { ch.request( ClientOperation.PUT_BINARY_TYPE, - req -> serDes.binaryMetadata(((BinaryTypeImpl)meta).metadata(), req) + req -> serDes.binaryMetadata(((BinaryTypeImpl)meta).metadata(), req.out()) ); } catch (ClientException e) { @@ -285,10 +276,10 @@ public class TcpIgniteClient implements IgniteClient { try { meta = ch.service( ClientOperation.GET_BINARY_TYPE, - req -> req.writeInt(typeId), + req -> req.out().writeInt(typeId), res -> { try { - return res.readBoolean() ? serDes.binaryMetadata(res) : null; + return res.in().readBoolean() ? serDes.binaryMetadata(res.in()) : null; } catch (IOException e) { throw new BinaryObjectException(e); @@ -341,12 +332,14 @@ public class TcpIgniteClient implements IgniteClient { try { res = ch.service( ClientOperation.REGISTER_BINARY_TYPE_NAME, - req -> { - req.writeByte(platformId); - req.writeInt(typeId); - writeString(clsName, req); + payloadCh -> { + BinaryOutputStream out = payloadCh.out(); + + out.writeByte(platformId); + out.writeInt(typeId); + writeString(clsName, out); }, - BinaryInputStream::readBoolean + payloadCh -> payloadCh.in().readBoolean() ); } catch (ClientException e) { @@ -361,9 +354,8 @@ public class TcpIgniteClient implements IgniteClient { } /** {@inheritDoc} */ - @Override @Deprecated - public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException { + @Override public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException { return registerClassName(platformId, typeId, clsName, false); } @@ -398,10 +390,12 @@ public class TcpIgniteClient implements IgniteClient { clsName = ch.service( ClientOperation.GET_BINARY_TYPE_NAME, req -> { - req.writeByte(platformId); - req.writeInt(typeId); + BinaryOutputStream out = req.out(); + + out.writeByte(platformId); + out.writeInt(typeId); }, - TcpIgniteClient.this::readString + res -> readString(res.in()) ); } catch (ClientException e) { diff --git a/modules/core/src/test/java/org/apache/ignite/client/AsyncChannelTest.java b/modules/core/src/test/java/org/apache/ignite/client/AsyncChannelTest.java new file mode 100644 index 0000000..543321c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/client/AsyncChannelTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CachePeekMode; +import org.apache.ignite.cache.query.Query; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * Async channel tests. + */ +public class AsyncChannelTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int NODES_CNT = 3; + + /** Threads count. */ + private static final int THREADS_CNT = 25; + + /** Cache name. */ + private static final String CACHE_NAME = "tx_cache"; + + /** Client connector address. */ + private static final String CLIENT_CONN_ADDR = "127.0.0.1:" + ClientConnectorConfiguration.DFLT_PORT; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName).setCacheConfiguration( + new CacheConfiguration(CACHE_NAME).setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrids(NODES_CNT); + + awaitPartitionMapExchange(); + } + + /** + * Test that client channel works in async mode. + */ + @Test + public void testAsyncRequests() throws Exception { + try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(CLIENT_CONN_ADDR))) { + Ignite ignite = grid(0); + + IgniteCache<Integer, Integer> igniteCache = ignite.cache(CACHE_NAME); + ClientCache<Integer, Integer> clientCache = client.cache(CACHE_NAME); + + clientCache.clear(); + + Lock keyLock = igniteCache.lock(0); + + IgniteInternalFuture fut; + + keyLock.lock(); + + try { + CountDownLatch latch = new CountDownLatch(1); + + fut = GridTestUtils.runAsync(() -> { + latch.countDown(); + + // This request is blocked until we explicitly unlock key in another thread. + clientCache.put(0, 0); + + clientCache.put(1, 1); + + assertEquals(10, clientCache.size(CachePeekMode.PRIMARY)); + }); + + latch.await(); + + for (int i = 2; i < 10; i++) { + clientCache.put(i, i); + + assertEquals((Integer)i, igniteCache.get(i)); + assertEquals((Integer)i, clientCache.get(i)); + } + + // Parallel thread must be blocked on key 0. + assertFalse(clientCache.containsKey(1)); + } + finally { + keyLock.unlock(); + } + + fut.get(); + + assertTrue(clientCache.containsKey(1)); + } + } + + /** + * Test multiple concurrent async requests. + */ + @Test + public void testConcurrentRequests() throws Exception { + try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(CLIENT_CONN_ADDR))) { + ClientCache<Integer, Integer> clientCache = client.cache(CACHE_NAME); + + clientCache.clear(); + + AtomicInteger keyCnt = new AtomicInteger(); + + CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT); + + GridTestUtils.runMultiThreaded(() -> { + try { + barrier.await(); + } + catch (Exception e) { + fail(); + } + + for (int i = 0; i < 100; i++) { + int key = keyCnt.incrementAndGet(); + + clientCache.put(key, key); + + assertEquals(key, (long)clientCache.get(key)); + } + + }, THREADS_CNT, "thin-client-thread"); + } + } + + /** + * Test multiple concurrent async queries. + */ + @Test + public void testConcurrentQueries() throws Exception { + try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(CLIENT_CONN_ADDR))) { + ClientCache<Integer, Integer> clientCache = client.cache(CACHE_NAME); + + clientCache.clear(); + + for (int i = 0; i < 10; i++) + clientCache.put(i, i); + + CyclicBarrier barrier = new CyclicBarrier(THREADS_CNT); + + GridTestUtils.runMultiThreaded(() -> { + try { + barrier.await(); + } + catch (Exception e) { + fail(); + } + + for (int i = 0; i < 10; i++) { + Query<Cache.Entry<Integer, String>> qry = new ScanQuery<Integer, String>().setPageSize(1); + + try (QueryCursor<Cache.Entry<Integer, String>> cur = clientCache.query(qry)) { + int cacheSize = clientCache.size(CachePeekMode.PRIMARY); + int curSize = cur.getAll().size(); + + assertEquals(cacheSize, curSize); + } + } + }, THREADS_CNT, "thin-client-thread"); + } + } +} diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java index 0558b0a..80a77ff 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java @@ -36,7 +36,8 @@ import org.junit.runners.Suite; IgniteBinaryQueryTest.class, SslParametersTest.class, ConnectionTest.class, - ConnectToStartingNodeTest.class + ConnectToStartingNodeTest.class, + AsyncChannelTest.class }) public class ClientTestSuite { // No-op.