This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 37fe3e86f1 IGNITE-19888 Java client: Track observable timestamp (#2371) 37fe3e86f1 is described below commit 37fe3e86f176e3ca43b605d42362dfd9acee1583 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Tue Aug 1 14:14:30 2023 +0300 IGNITE-19888 Java client: Track observable timestamp (#2371) * Add observable timestamp to standard response header in client protocol (including errors, because data modification can occur before the error, like in a compute task - so we still need to update client-side timestamp) * Add observable timestamp to TX_BEGIN and SQL_EXEC client requests * Propagate latest known timestamp in Java client * Fix .NET and C++ clients to ignore the timestamp for now (separate tickets exist) --- .../internal/client/proto/ClientMessagePacker.java | 50 ++++++ modules/client-handler/build.gradle | 2 + .../apache/ignite/client/handler/TestServer.java | 22 ++- .../ignite/client/handler/ClientHandlerModule.java | 18 +- .../handler/ClientInboundMessageHandler.java | 40 ++++- .../requests/sql/ClientSqlExecuteRequest.java | 16 +- .../handler/requests/table/ClientTableCommon.java | 16 +- .../table/ClientTupleContainsKeyRequest.java | 2 +- .../table/ClientTupleDeleteAllExactRequest.java | 2 +- .../table/ClientTupleDeleteAllRequest.java | 2 +- .../table/ClientTupleDeleteExactRequest.java | 2 +- .../requests/table/ClientTupleDeleteRequest.java | 2 +- .../requests/table/ClientTupleGetAllRequest.java | 2 +- .../table/ClientTupleGetAndDeleteRequest.java | 2 +- .../table/ClientTupleGetAndReplaceRequest.java | 2 +- .../table/ClientTupleGetAndUpsertRequest.java | 2 +- .../requests/table/ClientTupleGetRequest.java | 2 +- .../table/ClientTupleInsertAllRequest.java | 2 +- .../requests/table/ClientTupleInsertRequest.java | 2 +- .../table/ClientTupleReplaceExactRequest.java | 2 +- .../requests/table/ClientTupleReplaceRequest.java | 2 +- .../table/ClientTupleUpsertAllRequest.java | 2 +- .../requests/table/ClientTupleUpsertRequest.java | 2 +- .../requests/tx/ClientTransactionBeginRequest.java | 47 +++-- .../ignite/internal/client/ClientChannel.java | 15 +- .../ignite/internal/client/ReliableChannel.java | 23 +++ .../ignite/internal/client/TcpClientChannel.java | 14 ++ .../ignite/internal/client/sql/ClientSession.java | 2 + .../internal/client/tx/ClientTransactions.java | 5 +- .../apache/ignite/client/ClientMetricsTest.java | 2 +- .../client/ObservableTimestampPropagationTest.java | 97 +++++++++++ .../apache/ignite/client/RequestBalancingTest.java | 2 +- .../ignite/client/TestClientHandlerModule.java | 15 +- .../java/org/apache/ignite/client/TestServer.java | 46 ++++- .../org/apache/ignite/client/fakes/FakeIgnite.java | 103 +---------- .../apache/ignite/client/fakes/FakeTxManager.java | 191 +++++++++++++++++++++ .../RepeatedFinishClientTransactionTest.java | 7 +- .../cpp/ignite/client/detail/node_connection.cpp | 3 + .../cpp/ignite/client/detail/sql/sql_impl.cpp | 30 ++-- .../client/detail/transaction/transactions_impl.h | 3 + .../platforms/cpp/ignite/odbc/query/data_query.cpp | 3 + .../platforms/cpp/ignite/odbc/sql_connection.cpp | 3 + .../dotnet/Apache.Ignite.Tests/FakeServer.cs | 1 + .../dotnet/Apache.Ignite.Tests/MetricsTests.cs | 2 +- .../platforms/dotnet/Apache.Ignite/ErrorGroups.cs | 1 - .../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 3 + .../Internal/Proto/MsgPack/MsgPackWriter.cs | 5 + .../dotnet/Apache.Ignite/Internal/Sql/Sql.cs | 3 + .../Internal/Transactions/Transactions.cs | 11 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- .../org/apache/ignite/internal/tx/TxManager.java | 2 +- .../internal/tx/impl/IgniteTransactionsImpl.java | 31 ++-- .../ignite/internal/tx/impl/TxManagerImpl.java | 2 +- 53 files changed, 680 insertions(+), 191 deletions(-) diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java index 3584b6cdad..f4ef190be4 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java @@ -46,6 +46,11 @@ public class ClientMessagePacker implements AutoCloseable { */ private boolean closed; + /** + * Metadata. + */ + private @Nullable Object meta; + /** * Constructor. * @@ -179,6 +184,29 @@ public class ClientMessagePacker implements AutoCloseable { } } + /** + * Reserve space for long value. + * + * @return Index of reserved space. + */ + public int reserveLong() { + buf.writeByte(Code.INT64); + var index = buf.writerIndex(); + + buf.writeLong(0); + return index; + } + + /** + * Set long value at reserved index (see {@link #reserveLong()}). + * + * @param index Index. + * @param v Value. + */ + public void setLong(int index, long v) { + buf.setLong(index, v); + } + /** * Writes a long value. * @@ -559,6 +587,10 @@ public class ClientMessagePacker implements AutoCloseable { packInt(vals.length); + if (vals.length == 0) { + return; + } + // Builder with inline schema. // Every element in vals is represented by 3 tuple elements: type, scale, value. var builder = new BinaryTupleBuilder(vals.length * 3); @@ -631,6 +663,24 @@ public class ClientMessagePacker implements AutoCloseable { writePayload(buf); } + /** + * Gets metadata. + * + * @return Metadata. + */ + public @Nullable Object meta() { + return meta; + } + + /** + * Sets metadata. + * + * @param meta Metadata. + */ + public void meta(@Nullable Object meta) { + this.meta = meta; + } + /** * {@inheritDoc} */ diff --git a/modules/client-handler/build.gradle b/modules/client-handler/build.gradle index 008e6208ca..17907c6f81 100644 --- a/modules/client-handler/build.gradle +++ b/modules/client-handler/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation project(':ignite-schema') implementation project(':ignite-security') implementation project(':ignite-metrics') + implementation project(':ignite-transactions') implementation libs.jetbrains.annotations implementation libs.fastutil.core implementation libs.slf4j.jdk14 @@ -59,6 +60,7 @@ dependencies { integrationTestImplementation project(':ignite-table') integrationTestImplementation project(':ignite-metrics') integrationTestImplementation project(':ignite-security') + integrationTestImplementation project(':ignite-transactions') integrationTestImplementation(testFixtures(project(':ignite-configuration'))) integrationTestImplementation(testFixtures(project(':ignite-core'))) integrationTestImplementation libs.msgpack.core diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java index 552806934a..b1ddc6c992 100644 --- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java +++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java @@ -32,16 +32,17 @@ import org.apache.ignite.internal.configuration.ConfigurationManager; import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage; import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; +import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.network.configuration.NetworkConfiguration; import org.apache.ignite.internal.security.authentication.AuthenticationManager; import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NettyBootstrapFactory; import org.apache.ignite.sql.IgniteSql; -import org.apache.ignite.tx.IgniteTransactions; import org.junit.jupiter.api.TestInfo; import org.mockito.Mockito; @@ -115,10 +116,21 @@ public class TestServer { Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id"); Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id"); - var module = new ClientHandlerModule(mock(QueryProcessor.class), mock(IgniteTablesInternal.class), mock(IgniteTransactions.class), - registry, mock(IgniteCompute.class), clusterService, bootstrapFactory, mock(IgniteSql.class), - () -> CompletableFuture.completedFuture(UUID.randomUUID()), mock(MetricManager.class), metrics, - authenticationManager(), authenticationConfiguration + var module = new ClientHandlerModule( + mock(QueryProcessor.class), + mock(IgniteTablesInternal.class), + mock(IgniteTransactionsImpl.class), + registry, + mock(IgniteCompute.class), + clusterService, + bootstrapFactory, + mock(IgniteSql.class), + () -> CompletableFuture.completedFuture(UUID.randomUUID()), + mock(MetricManager.class), + metrics, + authenticationManager(), + authenticationConfiguration, + new HybridClockImpl() ); module.start(); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java index a8fe8e1f79..6dacd07709 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java @@ -36,6 +36,7 @@ import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.internal.client.proto.ClientMessageDecoder; import org.apache.ignite.internal.configuration.AuthenticationConfiguration; import org.apache.ignite.internal.configuration.ConfigurationRegistry; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; @@ -44,13 +45,13 @@ import org.apache.ignite.internal.network.ssl.SslContextProvider; import org.apache.ignite.internal.security.authentication.AuthenticationManager; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.lang.ErrorGroups; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NettyBootstrapFactory; import org.apache.ignite.sql.IgniteSql; -import org.apache.ignite.tx.IgniteTransactions; /** * Client handler module maintains TCP endpoint for thin client connections. @@ -66,7 +67,7 @@ public class ClientHandlerModule implements IgniteComponent { private final IgniteTablesInternal igniteTables; /** Ignite transactions API. */ - private final IgniteTransactions igniteTransactions; + private final IgniteTransactionsImpl igniteTransactions; /** Ignite SQL API. */ private final IgniteSql sql; @@ -102,6 +103,8 @@ public class ClientHandlerModule implements IgniteComponent { private final AuthenticationConfiguration authenticationConfiguration; + private final HybridClock clock; + /** * Constructor. * @@ -117,11 +120,12 @@ public class ClientHandlerModule implements IgniteComponent { * @param metricManager Metric manager. * @param authenticationManager Authentication manager. * @param authenticationConfiguration Authentication configuration. + * @param clock Hybrid clock. */ public ClientHandlerModule( QueryProcessor queryProcessor, IgniteTablesInternal igniteTables, - IgniteTransactions igniteTransactions, + IgniteTransactionsImpl igniteTransactions, ConfigurationRegistry registry, IgniteCompute igniteCompute, ClusterService clusterService, @@ -131,7 +135,8 @@ public class ClientHandlerModule implements IgniteComponent { MetricManager metricManager, ClientHandlerMetricSource metrics, AuthenticationManager authenticationManager, - AuthenticationConfiguration authenticationConfiguration) { + AuthenticationConfiguration authenticationConfiguration, + HybridClock clock) { assert igniteTables != null; assert registry != null; assert queryProcessor != null; @@ -144,6 +149,7 @@ public class ClientHandlerModule implements IgniteComponent { assert metrics != null; assert authenticationManager != null; assert authenticationConfiguration != null; + assert clock != null; this.queryProcessor = queryProcessor; this.igniteTables = igniteTables; @@ -158,6 +164,7 @@ public class ClientHandlerModule implements IgniteComponent { this.metrics = metrics; this.authenticationManager = authenticationManager; this.authenticationConfiguration = authenticationConfiguration; + this.clock = clock; } /** {@inheritDoc} */ @@ -291,7 +298,8 @@ public class ClientHandlerModule implements IgniteComponent { sql, clusterId, metrics, - authenticationManager); + authenticationManager, + clock); authenticationConfiguration.listen(clientInboundMessageHandler); return clientInboundMessageHandler; } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index ef195a296d..7e648904b8 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -92,6 +92,8 @@ import org.apache.ignite.internal.client.proto.ProtocolVersion; import org.apache.ignite.internal.client.proto.ResponseFlags; import org.apache.ignite.internal.client.proto.ServerMessageType; import org.apache.ignite.internal.configuration.AuthenticationView; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler; import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler; import org.apache.ignite.internal.logger.IgniteLogger; @@ -104,6 +106,7 @@ import org.apache.ignite.internal.security.authentication.UserDetails; import org.apache.ignite.internal.security.authentication.UsernamePasswordRequest; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteInternalCheckedException; @@ -113,7 +116,6 @@ import org.apache.ignite.network.ClusterService; import org.apache.ignite.security.AuthenticationException; import org.apache.ignite.security.AuthenticationType; import org.apache.ignite.sql.IgniteSql; -import org.apache.ignite.tx.IgniteTransactions; import org.jetbrains.annotations.Nullable; /** @@ -128,7 +130,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im private final IgniteTablesInternal igniteTables; /** Ignite transactions API. */ - private final IgniteTransactions igniteTransactions; + private final IgniteTransactionsImpl igniteTransactions; /** JDBC Handler. */ private final JdbcQueryEventHandler jdbcQueryEventHandler; @@ -157,6 +159,9 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im /** Metrics. */ private final ClientHandlerMetricSource metrics; + /** Hybrid clock. */ + private final HybridClock clock; + /** Context. */ private ClientContext clientContext; @@ -185,10 +190,11 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im * @param clusterId Cluster ID. * @param metrics Metrics. * @param authenticationManager Authentication manager. + * @param clock Hybrid clock. */ public ClientInboundMessageHandler( IgniteTablesInternal igniteTables, - IgniteTransactions igniteTransactions, + IgniteTransactionsImpl igniteTransactions, QueryProcessor processor, ClientConnectorView configuration, IgniteCompute compute, @@ -196,7 +202,8 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im IgniteSql sql, UUID clusterId, ClientHandlerMetricSource metrics, - AuthenticationManager authenticationManager + AuthenticationManager authenticationManager, + HybridClock clock ) { assert igniteTables != null; assert igniteTransactions != null; @@ -208,6 +215,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im assert clusterId != null; assert metrics != null; assert authenticationManager != null; + assert clock != null; this.igniteTables = igniteTables; this.igniteTransactions = igniteTransactions; @@ -218,6 +226,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im this.clusterId = clusterId; this.metrics = metrics; this.authenticationManager = authenticationManager; + this.clock = clock; jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources); jdbcQueryEventHandler = @@ -386,6 +395,11 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im packer.packLong(requestId); writeFlags(packer, ctx); + // Include server timestamp in error response as well: + // an operation can modify data and then throw an exception (e.g. Compute task), + // so we still need to update client-side timestamp to preserve causality guarantees. + packer.packLong(observableTimestamp(null)); + writeErrorCore(err, packer); write(packer, ctx); @@ -456,6 +470,9 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im out.packInt(ServerMessageType.RESPONSE); out.packLong(requestId); writeFlags(out, ctx); + + // Observable timestamp should be calculated after the operation is processed; reserve space, write later. + int observableTimestampIdx = out.reserveLong(); out.packNil(); // No error. var fut = processOperation(in, out, opCode); @@ -463,6 +480,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im if (fut == null) { // Operation completed synchronously. in.close(); + out.setLong(observableTimestampIdx, observableTimestamp(out)); write(out, ctx); if (LOG.isTraceEnabled()) { @@ -486,6 +504,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im metrics.requestsFailedIncrement(); } else { + out.setLong(observableTimestampIdx, observableTimestamp(out)); write(out, ctx); metrics.requestsProcessedIncrement(); @@ -724,4 +743,17 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im return null; } + + private long observableTimestamp(@Nullable ClientMessagePacker out) { + // Certain operations can override the timestamp and provide it in the meta object. + if (out != null) { + Object meta = out.meta(); + + if (meta instanceof HybridTimestamp) { + return ((HybridTimestamp) meta).longValue(); + } + } + + return clock.now().longValue(); + } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java index b68da2fc36..ad8675dd08 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.lang.IgniteInternalCheckedException; import org.apache.ignite.lang.IgniteInternalException; @@ -68,7 +69,7 @@ public class ClientSqlExecuteRequest { IgniteSql sql, ClientResourceRegistry resources, ClientHandlerMetricSource metrics) { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); Session session = readSession(in, sql); Statement statement = readStatement(in, sql); Object[] arguments = in.unpackObjectArrayFromBinaryTuple(); @@ -78,9 +79,20 @@ public class ClientSqlExecuteRequest { arguments = ArrayUtils.OBJECT_EMPTY_ARRAY; } + // TODO IGNITE-19898 SQL implicit RO transaction should use observation timestamp. + HybridTimestamp unused = HybridTimestamp.nullableHybridTimestamp(in.unpackLong()); + return session .executeAsync(tx, statement, arguments) - .thenCompose(asyncResultSet -> writeResultSetAsync(out, resources, asyncResultSet, session, metrics)); + .thenCompose(asyncResultSet -> { + //noinspection StatementWithEmptyBody + if (tx == null) { + // TODO IGNITE-19898 Return readTimestamp from implicit RO TX to the client + // out.meta(asyncResultSet.tx().readTimestamp()); + } + + return writeResultSetAsync(out, resources, asyncResultSet, session, metrics); + }); } private static CompletionStage<Void> writeResultSetAsync( diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java index 0c05718307..ad25f0be1e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java @@ -42,13 +42,13 @@ import org.apache.ignite.internal.schema.SchemaRegistry; import org.apache.ignite.internal.schema.TemporalNativeType; import org.apache.ignite.internal.table.IgniteTablesInternal; import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteInternalCheckedException; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.sql.ColumnType; import org.apache.ignite.table.Tuple; import org.apache.ignite.table.manager.IgniteTables; -import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -367,16 +367,26 @@ public class ClientTableCommon { * Reads transaction. * * @param in Unpacker. + * @param out Packer. * @param resources Resource registry. * @return Transaction, if present, or null. */ - public static @Nullable Transaction readTx(ClientMessageUnpacker in, ClientResourceRegistry resources) { + public static @Nullable InternalTransaction readTx( + ClientMessageUnpacker in, ClientMessagePacker out, ClientResourceRegistry resources) { if (in.tryUnpackNil()) { return null; } try { - return resources.get(in.unpackLong()).get(Transaction.class); + var tx = resources.get(in.unpackLong()).get(InternalTransaction.class); + + if (tx != null && tx.isReadOnly()) { + // For read-only tx, override observable timestamp that we send to the client: + // use readTimestamp() instead of now(). + out.meta(tx.readTimestamp()); + } + + return tx; } catch (IgniteInternalCheckedException e) { throw new IgniteException(e.traceId(), e.code(), e.getMessage(), e); } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java index e3564e33b8..9e21b2bd46 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java @@ -47,7 +47,7 @@ public class ClientTupleContainsKeyRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var keyTuple = readTuple(in, table, true); return table.recordView().getAsync(tx, keyTuple) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java index 7f203d0051..4ebaac2182 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java @@ -48,7 +48,7 @@ public class ClientTupleDeleteAllExactRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuples = readTuples(in, table, false); return table.recordView().deleteAllExactAsync(tx, tuples) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java index 3bcb103f66..2b991d444d 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java @@ -49,7 +49,7 @@ public class ClientTupleDeleteAllRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuples = readTuples(in, table, true); return table.recordView().deleteAllAsync(tx, tuples).thenAccept(skippedTuples -> diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java index 0d9a480403..55aac9821e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java @@ -47,7 +47,7 @@ public class ClientTupleDeleteExactRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, false); return table.recordView().deleteExactAsync(tx, tuple).thenAccept(res -> { diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java index 9b43617036..cda8f6d7b6 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java @@ -47,7 +47,7 @@ public class ClientTupleDeleteRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, true); return table.recordView().deleteAsync(tx, tuple).thenAccept(res -> { diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java index 3f2b74886b..ca63e74323 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java @@ -49,7 +49,7 @@ public class ClientTupleGetAllRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var keyTuples = readTuples(in, table, true); return table.recordView().getAllAsync(tx, keyTuples).thenAccept(tuples -> diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java index 91197d6bc1..c2b894c70e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java @@ -48,7 +48,7 @@ public class ClientTupleGetAndDeleteRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, true); return table.recordView().getAndDeleteAsync(tx, tuple).thenAccept( diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java index 7a78ecc44c..626750d2f2 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java @@ -48,7 +48,7 @@ public class ClientTupleGetAndReplaceRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, false); return table.recordView().getAndReplaceAsync(tx, tuple).thenAccept( diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java index 9245336c45..873a0e6758 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java @@ -48,7 +48,7 @@ public class ClientTupleGetAndUpsertRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, false); return table.recordView().getAndUpsertAsync(tx, tuple).thenAccept( diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java index 4d6530a8b7..51c88acfa2 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java @@ -48,7 +48,7 @@ public class ClientTupleGetRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var keyTuple = readTuple(in, table, true); return table.recordView().getAsync(tx, keyTuple) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java index 0af2351882..cc7291643b 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java @@ -48,7 +48,7 @@ public class ClientTupleInsertAllRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuples = readTuples(in, table, false); return table.recordView().insertAllAsync(tx, tuples).thenAccept(skippedTuples -> diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java index 81cf7f71e9..5398ea50fd 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java @@ -47,7 +47,7 @@ public class ClientTupleInsertRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, false); return table.recordView().insertAsync(tx, tuple).thenAccept(res -> { diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java index 4d10dd7859..0c1955e21b 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java @@ -48,7 +48,7 @@ public class ClientTupleReplaceExactRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var schema = readSchema(in, table); var oldTuple = readTuple(in, false, schema); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java index e7df36a156..46b1f48958 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java @@ -47,7 +47,7 @@ public class ClientTupleReplaceRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, false); return table.recordView().replaceAsync(tx, tuple).thenAccept(res -> { diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java index a4f178d1ea..63e9192bba 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java @@ -47,7 +47,7 @@ public class ClientTupleUpsertAllRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuples = readTuples(in, table, false); return table.recordView().upsertAllAsync(tx, tuples) diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java index f78990d921..b1d3bcf204 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java @@ -47,7 +47,7 @@ public class ClientTupleUpsertRequest { ClientResourceRegistry resources ) { return readTableAsync(in, tables).thenCompose(table -> { - var tx = readTx(in, resources); + var tx = readTx(in, out, resources); var tuple = readTuple(in, table, false); return table.recordView().upsertAsync(tx, tuple).thenAccept(v -> out.packInt(table.schemaView().lastSchemaVersion())); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java index 5f05681c57..8464cc4a89 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java @@ -23,10 +23,11 @@ import org.apache.ignite.client.handler.ClientResource; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.lang.IgniteInternalCheckedException; -import org.apache.ignite.lang.IgniteInternalException; -import org.apache.ignite.tx.IgniteTransactions; import org.apache.ignite.tx.TransactionOptions; +import org.jetbrains.annotations.Nullable; /** * Client transaction begin request. @@ -42,28 +43,42 @@ public class ClientTransactionBeginRequest { * @param metrics Metrics. * @return Future. */ - public static CompletableFuture<Void> process( + public static @Nullable CompletableFuture<Void> process( ClientMessageUnpacker in, ClientMessagePacker out, - IgniteTransactions transactions, + IgniteTransactionsImpl transactions, ClientResourceRegistry resources, - ClientHandlerMetricSource metrics) { + ClientHandlerMetricSource metrics) throws IgniteInternalCheckedException { TransactionOptions options = null; + HybridTimestamp observableTs = null; - if (in.unpackBoolean()) { + boolean readOnly = in.unpackBoolean(); + if (readOnly) { options = new TransactionOptions().readOnly(true); + + // Timestamp makes sense only for read-only transactions. + observableTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong()); + } + + // NOTE: we don't use beginAsync here because it is synchronous anyway. + var tx = transactions.begin(options, observableTs); + + if (readOnly) { + // For read-only tx, override observable timestamp that we send to the client: + // use readTimestamp() instead of now(). + out.meta(tx.readTimestamp()); } - return transactions.beginAsync(options).thenAccept(tx -> { - try { - long resourceId = resources.put(new ClientResource(tx, tx::rollbackAsync)); - out.packLong(resourceId); + try { + long resourceId = resources.put(new ClientResource(tx, tx::rollbackAsync)); + out.packLong(resourceId); + + metrics.transactionsActiveIncrement(); - metrics.transactionsActiveIncrement(); - } catch (IgniteInternalCheckedException e) { - tx.rollback(); - throw new IgniteInternalException(e.getMessage(), e); - } - }); + return null; + } catch (IgniteInternalCheckedException e) { + tx.rollback(); + throw e; + } } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java index 46895e3ac1..e1c3ca42d4 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java @@ -33,7 +33,7 @@ public interface ClientChannel extends AutoCloseable { * @param <T> Response type. * @return Future for the operation. */ - public <T> CompletableFuture<T> serviceAsync( + <T> CompletableFuture<T> serviceAsync( int opCode, PayloadWriter payloadWriter, PayloadReader<T> payloadReader @@ -44,19 +44,26 @@ public interface ClientChannel extends AutoCloseable { * * @return {@code True} channel is closed. */ - public boolean closed(); + boolean closed(); /** * Returns protocol context. * * @return Protocol context. */ - public ProtocolContext protocolContext(); + ProtocolContext protocolContext(); /** * Add topology change listener. * * @param listener Listener. */ - public void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener); + void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener); + + /** + * Add observable timestamp listener. + * + * @param listener Listener. + */ + void addObservableTimestampListener(Consumer<Long> listener); } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java index 0239296e59..c24098c1c2 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java @@ -106,6 +106,9 @@ public final class ReliableChannel implements AutoCloseable { * the table will compare its version with channel version to detect an update. */ private final AtomicLong assignmentVersion = new AtomicLong(); + /** Observable timestamp, or causality token. Sent by the server with every response, and required by some requests. */ + private final AtomicLong observableTimestamp = new AtomicLong(); + /** Cluster id from the first handshake. */ private final AtomicReference<UUID> clusterId = new AtomicReference<>(); @@ -180,6 +183,10 @@ public final class ReliableChannel implements AutoCloseable { return clientCfg; } + public long observableTimestamp() { + return observableTimestamp.get(); + } + /** * Sends request and handles response asynchronously. * @@ -649,6 +656,21 @@ public final class ReliableChannel implements AutoCloseable { } } + private void onObservableTimestampReceived(Long newTs) { + // Atomically update the observable timestamp to max(newTs, curTs). + while (true) { + long curTs = observableTimestamp.get(); + + if (curTs >= newTs) { + break; + } + + if (observableTimestamp.compareAndSet(curTs, newTs)) { + break; + } + } + } + private void onTopologyAssignmentChanged(ClientChannel clientChannel) { // NOTE: Multiple channels will send the same update to us, resulting in multiple cache invalidations. // This could be solved with a cluster-wide AssignmentVersion, but we don't have that. @@ -793,6 +815,7 @@ public final class ReliableChannel implements AutoCloseable { } ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged); + ch.addObservableTimestampListener(ReliableChannel.this::onObservableTimestampReceived); ClusterNode newNode = ch.protocolContext().clusterNode(); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java index 61f72bbebb..e11d3dd4de 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java @@ -102,6 +102,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon /** Topology change listeners. */ private final Collection<Consumer<ClientChannel>> assignmentChangeListeners = new CopyOnWriteArrayList<>(); + /** Observable timestamp listeners. */ + private final Collection<Consumer<Long>> observableTimestampListeners = new CopyOnWriteArrayList<>(); + /** Closed flag. */ private final AtomicBoolean closed = new AtomicBoolean(); @@ -402,6 +405,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon } } + long observableTimestamp = unpacker.unpackLong(); + + for (Consumer<Long> listener : observableTimestampListeners) { + listener.accept(observableTimestamp); + } + if (unpacker.tryUnpackNil()) { boolean completed = pendingReq.complete(unpacker); @@ -489,6 +498,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon assignmentChangeListeners.add(listener); } + @Override + public void addObservableTimestampListener(Consumer<Long> listener) { + observableTimestampListeners.add(listener); + } + private static void validateConfiguration(ClientChannelConfiguration cfg) { String error = null; diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java index 970bdd4eae..6f309d293c 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java @@ -158,6 +158,8 @@ public class ClientSession implements Session { w.out().packString(clientStatement.query()); w.out().packObjectArrayAsBinaryTuple(arguments); + + w.out().packLong(ch.observableTimestamp()); }; PayloadReader<AsyncResultSet<T>> payloadReader = r -> new ClientAsyncResultSet<>(r.clientChannel(), r.in(), mapper); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java index b2102b4e8f..8944d353a5 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java @@ -64,7 +64,10 @@ public class ClientTransactions implements IgniteTransactions { return ch.serviceAsync( ClientOp.TX_BEGIN, - w -> w.out().packBoolean(readOnly), + w -> { + w.out().packBoolean(readOnly); + w.out().packLong(ch.observableTimestamp()); + }, r -> readTx(r, readOnly)); } diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java index ae52552461..8b83a66c3f 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java @@ -223,7 +223,7 @@ public class ClientMetricsTest extends BaseIgniteAbstractTest { client.tables().tables(); assertEquals(21, metrics().bytesSent()); - assertEquals(55, metrics().bytesReceived()); + assertEquals(64, metrics().bytesReceived()); } @Test diff --git a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java new file mode 100644 index 0000000000..78b4c31293 --- /dev/null +++ b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import static org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.client.fakes.FakeIgnite; +import org.apache.ignite.internal.TestHybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.tx.TransactionOptions; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +/** + * Tests that observable timestamp (causality token) is propagated from server to client and back. + */ +public class ObservableTimestampPropagationTest { + private static TestServer testServer; + + private static FakeIgnite ignite; + + private static IgniteClient client; + + private static final AtomicLong currentServerTimestamp = new AtomicLong(1); + + @BeforeAll + public static void startServer2() { + TestHybridClock clock = new TestHybridClock(currentServerTimestamp::get); + + ignite = new FakeIgnite("server-2"); + testServer = new TestServer(0, ignite, null, null, "server-2", UUID.randomUUID(), null, null, clock); + + client = IgniteClient.builder().addresses("127.0.0.1:" + testServer.port()).build(); + } + + @AfterAll + public static void stopServer2() throws Exception { + IgniteUtils.closeAll(client, testServer, ignite); + } + + @Test + public void testClientPropagatesLatestKnownHybridTimestamp() { + assertNull(lastObservableTimestamp()); + + // RW TX does not propagate timestamp. + client.transactions().begin(); + assertNull(lastObservableTimestamp()); + + // RO TX propagates timestamp. + client.transactions().begin(new TransactionOptions().readOnly(true)); + assertEquals(1, lastObservableTimestamp()); + + // Increase timestamp on server - client does not know about it initially. + currentServerTimestamp.set(11); + client.transactions().begin(new TransactionOptions().readOnly(true)); + assertEquals(1, lastObservableTimestamp()); + + // Subsequent RO TX propagates latest known timestamp. + client.tables().tables(); + client.transactions().begin(new TransactionOptions().readOnly(true)); + assertEquals(11, lastObservableTimestamp()); + + // Smaller timestamp from server is ignored by client. + currentServerTimestamp.set(9); + client.transactions().begin(new TransactionOptions().readOnly(true)); + client.transactions().begin(new TransactionOptions().readOnly(true)); + assertEquals(11, lastObservableTimestamp()); + } + + private static @Nullable Long lastObservableTimestamp() { + HybridTimestamp ts = ignite.txManager().lastObservableTimestamp(); + + return ts == null ? null : ts.longValue() >> LOGICAL_TIME_BITS_SIZE; + } +} diff --git a/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java b/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java index e28efff465..cbdcc47435 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java @@ -65,7 +65,7 @@ public class RequestBalancingTest extends BaseIgniteAbstractTest { assertTrue(IgniteTestUtils.waitForCondition(() -> client.connections().size() == 3, 3000)); // Execute on unknown node to fall back to balancing. - List<String> res = IntStream.range(0, 5) + List<Object> res = IntStream.range(0, 5) .mapToObj(i -> client.compute().<String>executeAsync(getClusterNodes("s123"), List.of(), "job").join()) .collect(Collectors.toList()); diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java index c91537c005..3c439c2242 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java @@ -40,11 +40,13 @@ import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.internal.client.proto.ClientMessageDecoder; import org.apache.ignite.internal.configuration.AuthenticationConfiguration; import org.apache.ignite.internal.configuration.ConfigurationRegistry; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.security.authentication.AuthenticationManager; import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NettyBootstrapFactory; @@ -78,6 +80,9 @@ public class TestClientHandlerModule implements IgniteComponent { /** Metrics. */ private final ClientHandlerMetricSource metrics; + /** Clock. */ + private final HybridClock clock; + /** Netty channel. */ private volatile Channel channel; @@ -99,6 +104,7 @@ public class TestClientHandlerModule implements IgniteComponent { * @param compute Compute. * @param clusterId Cluster id. * @param metrics Metrics. + * @param clock Clock. */ public TestClientHandlerModule( Ignite ignite, @@ -110,7 +116,8 @@ public class TestClientHandlerModule implements IgniteComponent { IgniteCompute compute, UUID clusterId, ClientHandlerMetricSource metrics, - AuthenticationConfiguration authenticationConfiguration) { + AuthenticationConfiguration authenticationConfiguration, + HybridClock clock) { assert ignite != null; assert registry != null; assert bootstrapFactory != null; @@ -125,6 +132,7 @@ public class TestClientHandlerModule implements IgniteComponent { this.clusterId = clusterId; this.metrics = metrics; this.authenticationConfiguration = authenticationConfiguration; + this.clock = clock; } /** {@inheritDoc} */ @@ -184,7 +192,7 @@ public class TestClientHandlerModule implements IgniteComponent { new ResponseDelayHandler(responseDelay), new ClientInboundMessageHandler( (IgniteTablesInternal) ignite.tables(), - ignite.transactions(), + (IgniteTransactionsImpl) ignite.transactions(), mock(QueryProcessor.class), configuration, compute, @@ -192,7 +200,8 @@ public class TestClientHandlerModule implements IgniteComponent { ignite.sql(), clusterId, metrics, - authenticationManager(authenticationConfiguration))); + authenticationManager(authenticationConfiguration), + clock)); } }) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout()); diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java index 1aba74f063..3be82094ea 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java +++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java @@ -45,12 +45,15 @@ import org.apache.ignite.internal.configuration.ConfigurationRegistry; import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage; import org.apache.ignite.internal.configuration.validation.TestConfigurationValidator; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metrics.MetricManager; import org.apache.ignite.internal.network.configuration.NetworkConfiguration; import org.apache.ignite.internal.security.authentication.AuthenticationManager; import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl; import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterService; import org.apache.ignite.network.NettyBootstrapFactory; @@ -94,6 +97,33 @@ public class TestServer implements AutoCloseable { null, UUID.randomUUID(), null, + null, + null + ); + } + + /** + * Constructor. + */ + public TestServer( + long idleTimeout, + Ignite ignite, + @Nullable Function<Integer, Boolean> shouldDropConnection, + @Nullable Function<Integer, Integer> responseDelay, + @Nullable String nodeName, + UUID clusterId, + @Nullable AuthenticationConfiguration authenticationConfiguration, + @Nullable Integer port + ) { + this( + idleTimeout, + ignite, + shouldDropConnection, + responseDelay, + nodeName, + clusterId, + authenticationConfiguration, + port, null ); } @@ -112,7 +142,8 @@ public class TestServer implements AutoCloseable { @Nullable String nodeName, UUID clusterId, @Nullable AuthenticationConfiguration authenticationConfiguration, - @Nullable Integer port + @Nullable Integer port, + @Nullable HybridClock clock ) { ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); @@ -156,6 +187,11 @@ public class TestServer implements AutoCloseable { AuthenticationConfiguration authenticationConfigToApply = authenticationConfiguration == null ? mock(AuthenticationConfiguration.class) : authenticationConfiguration; + + if (clock == null) { + clock = new HybridClockImpl(); + } + module = shouldDropConnection != null ? new TestClientHandlerModule( ignite, @@ -167,11 +203,12 @@ public class TestServer implements AutoCloseable { compute, clusterId, metrics, - authenticationConfigToApply) + authenticationConfigToApply, + clock) : new ClientHandlerModule( ((FakeIgnite) ignite).queryEngine(), (IgniteTablesInternal) ignite.tables(), - ignite.transactions(), + (IgniteTransactionsImpl) ignite.transactions(), cfg, compute, clusterService, @@ -181,7 +218,8 @@ public class TestServer implements AutoCloseable { mock(MetricManager.class), metrics, authenticationManager(authenticationConfigToApply), - authenticationConfigToApply + authenticationConfigToApply, + clock ); module.start(); diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java index cd692c8726..c9eb41018e 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java @@ -18,26 +18,17 @@ package org.apache.ignite.client.fakes; import java.util.Collection; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.Ignite; import org.apache.ignite.compute.IgniteCompute; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; -import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.sql.engine.QueryProcessor; -import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.internal.tx.TxState; -import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.manager.IgniteTables; import org.apache.ignite.tx.IgniteTransactions; -import org.apache.ignite.tx.Transaction; -import org.apache.ignite.tx.TransactionException; -import org.apache.ignite.tx.TransactionOptions; -import org.jetbrains.annotations.NotNull; /** * Fake Ignite. @@ -47,6 +38,8 @@ public class FakeIgnite implements Ignite { private final HybridClock clock = new HybridClockImpl(); + private final FakeTxManager txMgr = new FakeTxManager(clock); + /** * Default constructor. */ @@ -79,91 +72,7 @@ public class FakeIgnite implements Ignite { /** {@inheritDoc} */ @Override public IgniteTransactions transactions() { - return new IgniteTransactions() { - @Override - public Transaction begin(TransactionOptions options) { - return beginAsync(options).join(); - } - - @Override - public CompletableFuture<Transaction> beginAsync(TransactionOptions options) { - return CompletableFuture.completedFuture(new InternalTransaction() { - private final UUID id = UUID.randomUUID(); - - private final HybridTimestamp timestamp = clock.now(); - - @Override - public @NotNull UUID id() { - return id; - } - - @Override - public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(TablePartitionId tablePartitionId) { - return null; - } - - @Override - public TxState state() { - return null; - } - - @Override - public boolean assignCommitPartition(TablePartitionId tablePartitionId) { - return false; - } - - @Override - public TablePartitionId commitPartition() { - return null; - } - - @Override - public IgniteBiTuple<ClusterNode, Long> enlist( - TablePartitionId tablePartitionId, - IgniteBiTuple<ClusterNode, Long> nodeAndTerm) { - return null; - } - - @Override - public void enlistResultFuture(CompletableFuture<?> resultFuture) {} - - @Override - public void commit() throws TransactionException { - - } - - @Override - public CompletableFuture<Void> commitAsync() { - return CompletableFuture.completedFuture(null); - } - - @Override - public void rollback() throws TransactionException { - - } - - @Override - public CompletableFuture<Void> rollbackAsync() { - return CompletableFuture.completedFuture(null); - } - - @Override - public boolean isReadOnly() { - return false; - } - - @Override - public HybridTimestamp readTimestamp() { - return null; - } - - @Override - public HybridTimestamp startTimestamp() { - return timestamp; - } - }); - } - }; + return new IgniteTransactionsImpl(txMgr); } /** {@inheritDoc} */ @@ -201,4 +110,8 @@ public class FakeIgnite implements Ignite { public String name() { return name; } + + public FakeTxManager txManager() { + return txMgr; + } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java new file mode 100644 index 0000000000..11f33056c3 --- /dev/null +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.fakes; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.tx.InternalTransaction; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.tx.TransactionException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Fake transaction manager. + */ +public class FakeTxManager implements TxManager { + private final HybridClock clock; + + private HybridTimestamp lastObservableTimestamp = null; + + public FakeTxManager(HybridClock clock) { + this.clock = clock; + } + + @Override + public void start() { + // No-op. + } + + @Override + public void stop() throws Exception { + // No-op. + } + + @Override + public InternalTransaction begin() { + return begin(false, null); + } + + @Override + public InternalTransaction begin(boolean readOnly, @Nullable HybridTimestamp observableTimestamp) { + lastObservableTimestamp = observableTimestamp; + + return new InternalTransaction() { + private final UUID id = UUID.randomUUID(); + + private final HybridTimestamp timestamp = clock.now(); + + @Override + public @NotNull UUID id() { + return id; + } + + @Override + public IgniteBiTuple<ClusterNode, Long> enlistedNodeAndTerm(TablePartitionId tablePartitionId) { + return null; + } + + @Override + public TxState state() { + return null; + } + + @Override + public boolean assignCommitPartition(TablePartitionId tablePartitionId) { + return false; + } + + @Override + public TablePartitionId commitPartition() { + return null; + } + + @Override + public IgniteBiTuple<ClusterNode, Long> enlist( + TablePartitionId tablePartitionId, + IgniteBiTuple<ClusterNode, Long> nodeAndTerm) { + return null; + } + + @Override + public void enlistResultFuture(CompletableFuture<?> resultFuture) { + } + + @Override + public void commit() throws TransactionException { + + } + + @Override + public CompletableFuture<Void> commitAsync() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void rollback() throws TransactionException { + + } + + @Override + public CompletableFuture<Void> rollbackAsync() { + return CompletableFuture.completedFuture(null); + } + + @Override + public boolean isReadOnly() { + return false; + } + + @Override + public HybridTimestamp readTimestamp() { + return observableTimestamp; + } + + @Override + public HybridTimestamp startTimestamp() { + return timestamp; + } + }; + } + + @Override + public @Nullable TxState state(UUID txId) { + return null; + } + + @Override + public void changeState(UUID txId, @Nullable TxState before, TxState after) { + + } + + @Override + public LockManager lockManager() { + return null; + } + + @Override + public CompletableFuture<Void> finish(TablePartitionId commitPartition, ClusterNode recipientNode, Long term, boolean commit, + Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>> groups, UUID txId) { + return null; + } + + @Override + public CompletableFuture<Void> cleanup(ClusterNode recipientNode, List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds, + UUID txId, boolean commit, @Nullable HybridTimestamp commitTimestamp) { + return null; + } + + @Override + public int finished() { + return 0; + } + + @Override + public int pending() { + return 0; + } + + @Override + public CompletableFuture<Void> updateLowWatermark(HybridTimestamp newLowWatermark) { + return null; + } + + public @Nullable HybridTimestamp lastObservableTimestamp() { + return lastObservableTimestamp; + } +} diff --git a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java index 6a681d8e26..63041b71f2 100644 --- a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java +++ b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java @@ -204,11 +204,16 @@ public class RepeatedFinishClientTransactionTest extends BaseIgniteAbstractTest @Override public void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener) { + // No-op. + } + @Override + public void addObservableTimestampListener(Consumer<Long> listener) { + // No-op. } @Override - public void close() throws Exception { + public void close() { } } diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp index e90ad52ecb..1c9972cbd2 100644 --- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp +++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp @@ -91,6 +91,9 @@ void node_connection::process_message(bytes_view msg) { auto flags = reader.read_int32(); UNUSED_VALUE flags; // Flags are unused for now. + auto observable_timestamp = reader.read_int64(); + UNUSED_VALUE observable_timestamp; // // TODO IGNITE-20057 C++ client: Track observable timestamp + auto handler = get_and_remove_handler(req_id); if (!handler) { m_logger->log_error("Missing handler for request with id=" + std::to_string(req_id)); diff --git a/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp b/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp index 39e4eff4c5..10a36d2257 100644 --- a/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp +++ b/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp @@ -65,27 +65,29 @@ void sql_impl::execute_async(transaction *tx, const sql_statement &statement, st if (args.empty()) { writer.write_nil(); - return; - } + } else { + auto args_num = std::int32_t(args.size()); - auto args_num = std::int32_t(args.size()); + writer.write(args_num); - writer.write(args_num); + binary_tuple_builder args_builder{args_num * 3}; - binary_tuple_builder args_builder{args_num * 3}; + args_builder.start(); + for (const auto &arg : args) { + protocol::claim_primitive_with_type(args_builder, arg); + } - args_builder.start(); - for (const auto &arg : args) { - protocol::claim_primitive_with_type(args_builder, arg); - } + args_builder.layout(); + for (const auto &arg : args) { + protocol::append_primitive_with_type(args_builder, arg); + } - args_builder.layout(); - for (const auto &arg : args) { - protocol::append_primitive_with_type(args_builder, arg); + auto args_data = args_builder.build(); + writer.write_binary(args_data); } - auto args_data = args_builder.build(); - writer.write_binary(args_data); + // TODO IGNITE-20057 C++ client: Track observable timestamp + writer.write(0); // observableTimestamp. }; auto reader_func = [](std::shared_ptr<node_connection> channel, bytes_view msg) -> result_set { diff --git a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h index 1b1aa3c943..98f792367c 100644 --- a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h +++ b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h @@ -57,6 +57,9 @@ public: IGNITE_API void begin_async(ignite_callback<transaction> callback) { auto writer_func = [](protocol::writer &writer) { writer.write_bool(false); // readOnly. + + // TODO IGNITE-20057 C++ client: Track observable timestamp + writer.write(0); // observableTimestamp. }; auto reader_func = [](protocol::reader &reader, std::shared_ptr<node_connection> conn) mutable -> transaction { diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp index f59b116eba..9d039567c0 100644 --- a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp +++ b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp @@ -344,6 +344,9 @@ sql_result data_query::make_request_execute() { writer.write(m_query); m_params.write(writer); + + // TODO IGNITE-20057 C++ client: Track observable timestamp + writer.write(0); // observableTimestamp. }); m_connection.mark_transaction_non_empty(); diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp index 81b14139d9..9b3130da7d 100644 --- a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp +++ b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp @@ -325,6 +325,9 @@ network::data_buffer_owning sql_connection::receive_message(std::int64_t id, std auto flags = reader.read_int32(); UNUSED_VALUE flags; // Flags are unused for now. + auto observable_timestamp = reader.read_int64(); + UNUSED_VALUE observable_timestamp; // // TODO IGNITE-20057 C++ client: Track observable timestamp + auto err = protocol::read_error(reader); if (err) { throw odbc_error(sql_state::SHY000_GENERAL_ERROR, err.value().what_str()); diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs index 96dadabeff..81c668e199 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs @@ -339,6 +339,7 @@ namespace Apache.Ignite.Tests writer.Write(0); // Message type. writer.Write(requestId); writer.Write(PartitionAssignmentChanged ? (int)ResponseFlags.PartitionAssignmentChanged : 0); + writer.Write(0); // Observable timestamp. if (!isError) { diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs index 5f0258b066..501fb2e362 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs @@ -97,7 +97,7 @@ public class MetricsTests await client.Tables.GetTablesAsync(); Assert.AreEqual(17, _listener.GetMetric("bytes-sent")); - Assert.AreEqual(72, _listener.GetMetric("bytes-received")); + Assert.AreEqual(73, _listener.GetMetric("bytes-received")); } [Test] diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs index 10703c8d9f..8034286d48 100644 --- a/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs +++ b/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs @@ -17,7 +17,6 @@ namespace Apache.Ignite { - using System; using System.Diagnostics.CodeAnalysis; /// <summary> diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs index f9319fc3d7..d61dd176e0 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs @@ -700,6 +700,9 @@ namespace Apache.Ignite.Internal _assignmentChangeCallback(this); } + // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp + _ = reader.ReadInt64(); + var exception = ReadError(ref reader); if (exception != null) diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs index ad8a47ec5b..4476eab85b 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs @@ -419,6 +419,11 @@ internal readonly ref struct MsgPackWriter Write(col.Count); + if (col.Count == 0) + { + return; + } + using var builder = new BinaryTupleBuilder(col.Count * 3); foreach (var obj in col) diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs index 4d93e33270..2d23d8b4d3 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs @@ -190,6 +190,9 @@ namespace Apache.Ignite.Internal.Sql w.Write(statement.Query); w.WriteObjectCollectionAsBinaryTuple(args); + // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp + w.Write(0); + return writer; } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs index 2e875a603d..31129c944c 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs @@ -43,7 +43,7 @@ namespace Apache.Ignite.Internal.Transactions public async Task<ITransaction> BeginAsync(TransactionOptions options) { using var writer = ProtoCommon.GetMessageWriter(); - writer.MessageWriter.Write(options.ReadOnly); + Write(); // Transaction and all corresponding operations must be performed using the same connection. var (resBuf, socket) = await _socket.DoOutInOpAndGetSocketAsync(ClientOp.TxBegin, request: writer).ConfigureAwait(false); @@ -54,6 +54,15 @@ namespace Apache.Ignite.Internal.Transactions return new Transaction(txId, socket, _socket, options.ReadOnly); } + + void Write() + { + var w = writer.MessageWriter; + w.Write(options.ReadOnly); + + // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp + w.Write(0); + } } /// <inheritdoc /> diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index de5e7e783a..27eaf4004f 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -605,7 +605,8 @@ public class IgniteImpl implements Ignite { metricManager, new ClientHandlerMetricSource(), authenticationManager, - authenticationConfiguration + authenticationConfiguration, + clock ); restComponent = createRestComponent(name); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java index 58dc4f93f0..074d54a197 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java @@ -55,7 +55,7 @@ public interface TxManager extends IgniteComponent { * @throws IgniteInternalException with {@link Transactions#TX_READ_ONLY_TOO_OLD_ERR} if transaction much older than the data available * in the tables. */ - InternalTransaction begin(boolean readOnly, HybridTimestamp observableTimestamp); + InternalTransaction begin(boolean readOnly, @Nullable HybridTimestamp observableTimestamp); /** * Returns a transaction state. diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java index a4b8ea297f..bb67f6232b 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.tx.impl; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.tx.IgniteTransactions; import org.apache.ignite.tx.Transaction; @@ -39,25 +41,34 @@ public class IgniteTransactionsImpl implements IgniteTransactions { this.txManager = txManager; } - /** {@inheritDoc} */ - @Override - public Transaction begin(@Nullable TransactionOptions options) { + /** + * Begins a transaction. + * + * @param options Transaction options. + * @param observableTimestamp Observable timestamp, applicable only for read-only transactions. Read-only transactions + * can use some time to the past to avoid waiting for time that is safe for reading on non-primary replica. To do so, client + * should provide this observable timestamp that is calculated according to the commit time of the latest read-write transaction, + * to guarantee that read-only transaction will see the modified data. + * @return The started transaction. + */ + public InternalTransaction begin(@Nullable TransactionOptions options, @Nullable HybridTimestamp observableTimestamp) { if (options != null && options.timeoutMillis() != 0) { // TODO: IGNITE-15936. throw new UnsupportedOperationException("Timeouts are not supported yet"); } - return txManager.begin(options != null && options.readOnly(), null); + return txManager.begin(options != null && options.readOnly(), observableTimestamp); } /** {@inheritDoc} */ @Override - public CompletableFuture<Transaction> beginAsync(@Nullable TransactionOptions options) { - if (options != null && options.timeoutMillis() != 0) { - // TODO: IGNITE-15936. - throw new UnsupportedOperationException("Timeouts are not supported yet"); - } + public Transaction begin(@Nullable TransactionOptions options) { + return begin(options, null); + } - return CompletableFuture.completedFuture(txManager.begin(options != null && options.readOnly(), null)); + /** {@inheritDoc} */ + @Override + public CompletableFuture<Transaction> beginAsync(@Nullable TransactionOptions options) { + return CompletableFuture.completedFuture(begin(options, null)); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index f60d5720dd..f1147dd08c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -117,7 +117,7 @@ public class TxManagerImpl implements TxManager { } @Override - public InternalTransaction begin(boolean readOnly, HybridTimestamp observableTimestamp) { + public InternalTransaction begin(boolean readOnly, @Nullable HybridTimestamp observableTimestamp) { assert readOnly || observableTimestamp == null : "Observable timestamp is applicable just for read-only transactions."; HybridTimestamp beginTimestamp = clock.now();