This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 37fe3e86f1 IGNITE-19888 Java client: Track observable timestamp (#2371)
37fe3e86f1 is described below

commit 37fe3e86f176e3ca43b605d42362dfd9acee1583
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Tue Aug 1 14:14:30 2023 +0300

    IGNITE-19888 Java client: Track observable timestamp (#2371)
    
    * Add observable timestamp to standard response header in client protocol 
(including errors, because data modification can occur before the error, like 
in a compute task - so we still need to update client-side timestamp)
    * Add observable timestamp to TX_BEGIN and SQL_EXEC client requests
    * Propagate latest known timestamp in Java client
    * Fix .NET and C++ clients to ignore the timestamp for now (separate 
tickets exist)
---
 .../internal/client/proto/ClientMessagePacker.java |  50 ++++++
 modules/client-handler/build.gradle                |   2 +
 .../apache/ignite/client/handler/TestServer.java   |  22 ++-
 .../ignite/client/handler/ClientHandlerModule.java |  18 +-
 .../handler/ClientInboundMessageHandler.java       |  40 ++++-
 .../requests/sql/ClientSqlExecuteRequest.java      |  16 +-
 .../handler/requests/table/ClientTableCommon.java  |  16 +-
 .../table/ClientTupleContainsKeyRequest.java       |   2 +-
 .../table/ClientTupleDeleteAllExactRequest.java    |   2 +-
 .../table/ClientTupleDeleteAllRequest.java         |   2 +-
 .../table/ClientTupleDeleteExactRequest.java       |   2 +-
 .../requests/table/ClientTupleDeleteRequest.java   |   2 +-
 .../requests/table/ClientTupleGetAllRequest.java   |   2 +-
 .../table/ClientTupleGetAndDeleteRequest.java      |   2 +-
 .../table/ClientTupleGetAndReplaceRequest.java     |   2 +-
 .../table/ClientTupleGetAndUpsertRequest.java      |   2 +-
 .../requests/table/ClientTupleGetRequest.java      |   2 +-
 .../table/ClientTupleInsertAllRequest.java         |   2 +-
 .../requests/table/ClientTupleInsertRequest.java   |   2 +-
 .../table/ClientTupleReplaceExactRequest.java      |   2 +-
 .../requests/table/ClientTupleReplaceRequest.java  |   2 +-
 .../table/ClientTupleUpsertAllRequest.java         |   2 +-
 .../requests/table/ClientTupleUpsertRequest.java   |   2 +-
 .../requests/tx/ClientTransactionBeginRequest.java |  47 +++--
 .../ignite/internal/client/ClientChannel.java      |  15 +-
 .../ignite/internal/client/ReliableChannel.java    |  23 +++
 .../ignite/internal/client/TcpClientChannel.java   |  14 ++
 .../ignite/internal/client/sql/ClientSession.java  |   2 +
 .../internal/client/tx/ClientTransactions.java     |   5 +-
 .../apache/ignite/client/ClientMetricsTest.java    |   2 +-
 .../client/ObservableTimestampPropagationTest.java |  97 +++++++++++
 .../apache/ignite/client/RequestBalancingTest.java |   2 +-
 .../ignite/client/TestClientHandlerModule.java     |  15 +-
 .../java/org/apache/ignite/client/TestServer.java  |  46 ++++-
 .../org/apache/ignite/client/fakes/FakeIgnite.java | 103 +----------
 .../apache/ignite/client/fakes/FakeTxManager.java  | 191 +++++++++++++++++++++
 .../RepeatedFinishClientTransactionTest.java       |   7 +-
 .../cpp/ignite/client/detail/node_connection.cpp   |   3 +
 .../cpp/ignite/client/detail/sql/sql_impl.cpp      |  30 ++--
 .../client/detail/transaction/transactions_impl.h  |   3 +
 .../platforms/cpp/ignite/odbc/query/data_query.cpp |   3 +
 .../platforms/cpp/ignite/odbc/sql_connection.cpp   |   3 +
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |   1 +
 .../dotnet/Apache.Ignite.Tests/MetricsTests.cs     |   2 +-
 .../platforms/dotnet/Apache.Ignite/ErrorGroups.cs  |   1 -
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |   3 +
 .../Internal/Proto/MsgPack/MsgPackWriter.cs        |   5 +
 .../dotnet/Apache.Ignite/Internal/Sql/Sql.cs       |   3 +
 .../Internal/Transactions/Transactions.cs          |  11 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../org/apache/ignite/internal/tx/TxManager.java   |   2 +-
 .../internal/tx/impl/IgniteTransactionsImpl.java   |  31 ++--
 .../ignite/internal/tx/impl/TxManagerImpl.java     |   2 +-
 53 files changed, 680 insertions(+), 191 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 3584b6cdad..f4ef190be4 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -46,6 +46,11 @@ public class ClientMessagePacker implements AutoCloseable {
      */
     private boolean closed;
 
+    /**
+     * Metadata.
+     */
+    private @Nullable Object meta;
+
     /**
      * Constructor.
      *
@@ -179,6 +184,29 @@ public class ClientMessagePacker implements AutoCloseable {
         }
     }
 
+    /**
+     * Reserve space for long value.
+     *
+     * @return Index of reserved space.
+     */
+    public int reserveLong() {
+        buf.writeByte(Code.INT64);
+        var index = buf.writerIndex();
+
+        buf.writeLong(0);
+        return index;
+    }
+
+    /**
+     * Set long value at reserved index (see {@link #reserveLong()}).
+     *
+     * @param index Index.
+     * @param v Value.
+     */
+    public void setLong(int index, long v) {
+        buf.setLong(index, v);
+    }
+
     /**
      * Writes a long value.
      *
@@ -559,6 +587,10 @@ public class ClientMessagePacker implements AutoCloseable {
 
         packInt(vals.length);
 
+        if (vals.length == 0) {
+            return;
+        }
+
         // Builder with inline schema.
         // Every element in vals is represented by 3 tuple elements: type, 
scale, value.
         var builder = new BinaryTupleBuilder(vals.length * 3);
@@ -631,6 +663,24 @@ public class ClientMessagePacker implements AutoCloseable {
         writePayload(buf);
     }
 
+    /**
+     * Gets metadata.
+     *
+     * @return Metadata.
+     */
+    public @Nullable Object meta() {
+        return meta;
+    }
+
+    /**
+     * Sets metadata.
+     *
+     * @param meta Metadata.
+     */
+    public void meta(@Nullable Object meta) {
+        this.meta = meta;
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git a/modules/client-handler/build.gradle 
b/modules/client-handler/build.gradle
index 008e6208ca..17907c6f81 100644
--- a/modules/client-handler/build.gradle
+++ b/modules/client-handler/build.gradle
@@ -35,6 +35,7 @@ dependencies {
     implementation project(':ignite-schema')
     implementation project(':ignite-security')
     implementation project(':ignite-metrics')
+    implementation project(':ignite-transactions')
     implementation libs.jetbrains.annotations
     implementation libs.fastutil.core
     implementation libs.slf4j.jdk14
@@ -59,6 +60,7 @@ dependencies {
     integrationTestImplementation project(':ignite-table')
     integrationTestImplementation project(':ignite-metrics')
     integrationTestImplementation project(':ignite-security')
+    integrationTestImplementation project(':ignite-transactions')
     
integrationTestImplementation(testFixtures(project(':ignite-configuration')))
     integrationTestImplementation(testFixtures(project(':ignite-core')))
     integrationTestImplementation libs.msgpack.core
diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
index 552806934a..b1ddc6c992 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java
@@ -32,16 +32,17 @@ import 
org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
 import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManager;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.TestInfo;
 import org.mockito.Mockito;
 
@@ -115,10 +116,21 @@ public class TestServer {
         
Mockito.when(clusterService.topologyService().localMember().id()).thenReturn("id");
         
Mockito.when(clusterService.topologyService().localMember().name()).thenReturn("consistent-id");
 
-        var module = new ClientHandlerModule(mock(QueryProcessor.class), 
mock(IgniteTablesInternal.class), mock(IgniteTransactions.class),
-                registry, mock(IgniteCompute.class), clusterService, 
bootstrapFactory, mock(IgniteSql.class),
-                () -> CompletableFuture.completedFuture(UUID.randomUUID()), 
mock(MetricManager.class), metrics,
-                authenticationManager(), authenticationConfiguration
+        var module = new ClientHandlerModule(
+                mock(QueryProcessor.class),
+                mock(IgniteTablesInternal.class),
+                mock(IgniteTransactionsImpl.class),
+                registry,
+                mock(IgniteCompute.class),
+                clusterService,
+                bootstrapFactory,
+                mock(IgniteSql.class),
+                () -> CompletableFuture.completedFuture(UUID.randomUUID()),
+                mock(MetricManager.class),
+                metrics,
+                authenticationManager(),
+                authenticationConfiguration,
+                new HybridClockImpl()
         );
 
         module.start();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index a8fe8e1f79..6dacd07709 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -36,6 +36,7 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
 import org.apache.ignite.internal.configuration.AuthenticationConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
@@ -44,13 +45,13 @@ import 
org.apache.ignite.internal.network.ssl.SslContextProvider;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManager;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.lang.ErrorGroups;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.tx.IgniteTransactions;
 
 /**
  * Client handler module maintains TCP endpoint for thin client connections.
@@ -66,7 +67,7 @@ public class ClientHandlerModule implements IgniteComponent {
     private final IgniteTablesInternal igniteTables;
 
     /** Ignite transactions API. */
-    private final IgniteTransactions igniteTransactions;
+    private final IgniteTransactionsImpl igniteTransactions;
 
     /** Ignite SQL API. */
     private final IgniteSql sql;
@@ -102,6 +103,8 @@ public class ClientHandlerModule implements IgniteComponent 
{
 
     private final AuthenticationConfiguration authenticationConfiguration;
 
+    private final HybridClock clock;
+
     /**
      * Constructor.
      *
@@ -117,11 +120,12 @@ public class ClientHandlerModule implements 
IgniteComponent {
      * @param metricManager Metric manager.
      * @param authenticationManager Authentication manager.
      * @param authenticationConfiguration Authentication configuration.
+     * @param clock Hybrid clock.
      */
     public ClientHandlerModule(
             QueryProcessor queryProcessor,
             IgniteTablesInternal igniteTables,
-            IgniteTransactions igniteTransactions,
+            IgniteTransactionsImpl igniteTransactions,
             ConfigurationRegistry registry,
             IgniteCompute igniteCompute,
             ClusterService clusterService,
@@ -131,7 +135,8 @@ public class ClientHandlerModule implements IgniteComponent 
{
             MetricManager metricManager,
             ClientHandlerMetricSource metrics,
             AuthenticationManager authenticationManager,
-            AuthenticationConfiguration authenticationConfiguration) {
+            AuthenticationConfiguration authenticationConfiguration,
+            HybridClock clock) {
         assert igniteTables != null;
         assert registry != null;
         assert queryProcessor != null;
@@ -144,6 +149,7 @@ public class ClientHandlerModule implements IgniteComponent 
{
         assert metrics != null;
         assert authenticationManager != null;
         assert authenticationConfiguration != null;
+        assert clock != null;
 
         this.queryProcessor = queryProcessor;
         this.igniteTables = igniteTables;
@@ -158,6 +164,7 @@ public class ClientHandlerModule implements IgniteComponent 
{
         this.metrics = metrics;
         this.authenticationManager = authenticationManager;
         this.authenticationConfiguration = authenticationConfiguration;
+        this.clock = clock;
     }
 
     /** {@inheritDoc} */
@@ -291,7 +298,8 @@ public class ClientHandlerModule implements IgniteComponent 
{
                 sql,
                 clusterId,
                 metrics,
-                authenticationManager);
+                authenticationManager,
+                clock);
         authenticationConfiguration.listen(clientInboundMessageHandler);
         return clientInboundMessageHandler;
     }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index ef195a296d..7e648904b8 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -92,6 +92,8 @@ import 
org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ResponseFlags;
 import org.apache.ignite.internal.client.proto.ServerMessageType;
 import org.apache.ignite.internal.configuration.AuthenticationView;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
 import org.apache.ignite.internal.jdbc.proto.JdbcQueryEventHandler;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -104,6 +106,7 @@ import 
org.apache.ignite.internal.security.authentication.UserDetails;
 import 
org.apache.ignite.internal.security.authentication.UsernamePasswordRequest;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -113,7 +116,6 @@ import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.security.AuthenticationException;
 import org.apache.ignite.security.AuthenticationType;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.tx.IgniteTransactions;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -128,7 +130,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
     private final IgniteTablesInternal igniteTables;
 
     /** Ignite transactions API. */
-    private final IgniteTransactions igniteTransactions;
+    private final IgniteTransactionsImpl igniteTransactions;
 
     /** JDBC Handler. */
     private final JdbcQueryEventHandler jdbcQueryEventHandler;
@@ -157,6 +159,9 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
     /** Metrics. */
     private final ClientHandlerMetricSource metrics;
 
+    /** Hybrid clock. */
+    private final HybridClock clock;
+
     /** Context. */
     private ClientContext clientContext;
 
@@ -185,10 +190,11 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
      * @param clusterId Cluster ID.
      * @param metrics Metrics.
      * @param authenticationManager Authentication manager.
+     * @param clock Hybrid clock.
      */
     public ClientInboundMessageHandler(
             IgniteTablesInternal igniteTables,
-            IgniteTransactions igniteTransactions,
+            IgniteTransactionsImpl igniteTransactions,
             QueryProcessor processor,
             ClientConnectorView configuration,
             IgniteCompute compute,
@@ -196,7 +202,8 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             IgniteSql sql,
             UUID clusterId,
             ClientHandlerMetricSource metrics,
-            AuthenticationManager authenticationManager
+            AuthenticationManager authenticationManager,
+            HybridClock clock
     ) {
         assert igniteTables != null;
         assert igniteTransactions != null;
@@ -208,6 +215,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         assert clusterId != null;
         assert metrics != null;
         assert authenticationManager != null;
+        assert clock != null;
 
         this.igniteTables = igniteTables;
         this.igniteTransactions = igniteTransactions;
@@ -218,6 +226,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
         this.clusterId = clusterId;
         this.metrics = metrics;
         this.authenticationManager = authenticationManager;
+        this.clock = clock;
 
         jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
         jdbcQueryEventHandler = 
@@ -386,6 +395,11 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             packer.packLong(requestId);
             writeFlags(packer, ctx);
 
+            // Include server timestamp in error response as well:
+            // an operation can modify data and then throw an exception (e.g. 
Compute task),
+            // so we still need to update client-side timestamp to preserve 
causality guarantees.
+            packer.packLong(observableTimestamp(null));
+
             writeErrorCore(err, packer);
 
             write(packer, ctx);
@@ -456,6 +470,9 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             out.packInt(ServerMessageType.RESPONSE);
             out.packLong(requestId);
             writeFlags(out, ctx);
+
+            // Observable timestamp should be calculated after the operation 
is processed; reserve space, write later.
+            int observableTimestampIdx = out.reserveLong();
             out.packNil(); // No error.
 
             var fut = processOperation(in, out, opCode);
@@ -463,6 +480,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             if (fut == null) {
                 // Operation completed synchronously.
                 in.close();
+                out.setLong(observableTimestampIdx, observableTimestamp(out));
                 write(out, ctx);
 
                 if (LOG.isTraceEnabled()) {
@@ -486,6 +504,7 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
 
                         metrics.requestsFailedIncrement();
                     } else {
+                        out.setLong(observableTimestampIdx, 
observableTimestamp(out));
                         write(out, ctx);
 
                         metrics.requestsProcessedIncrement();
@@ -724,4 +743,17 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
 
         return null;
     }
+
+    private long observableTimestamp(@Nullable ClientMessagePacker out) {
+        // Certain operations can override the timestamp and provide it in the 
meta object.
+        if (out != null) {
+            Object meta = out.meta();
+
+            if (meta instanceof HybridTimestamp) {
+                return ((HybridTimestamp) meta).longValue();
+            }
+        }
+
+        return clock.now().longValue();
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index b68da2fc36..ad8675dd08 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -33,6 +33,7 @@ import 
org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -68,7 +69,7 @@ public class ClientSqlExecuteRequest {
             IgniteSql sql,
             ClientResourceRegistry resources,
             ClientHandlerMetricSource metrics) {
-        var tx = readTx(in, resources);
+        var tx = readTx(in, out, resources);
         Session session = readSession(in, sql);
         Statement statement = readStatement(in, sql);
         Object[] arguments = in.unpackObjectArrayFromBinaryTuple();
@@ -78,9 +79,20 @@ public class ClientSqlExecuteRequest {
             arguments = ArrayUtils.OBJECT_EMPTY_ARRAY;
         }
 
+        // TODO IGNITE-19898 SQL implicit RO transaction should use 
observation timestamp.
+        HybridTimestamp unused = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
+
         return session
                 .executeAsync(tx, statement, arguments)
-                .thenCompose(asyncResultSet -> writeResultSetAsync(out, 
resources, asyncResultSet, session, metrics));
+                .thenCompose(asyncResultSet -> {
+                    //noinspection StatementWithEmptyBody
+                    if (tx == null) {
+                        // TODO IGNITE-19898 Return readTimestamp from 
implicit RO TX to the client
+                        // out.meta(asyncResultSet.tx().readTimestamp());
+                    }
+
+                    return writeResultSetAsync(out, resources, asyncResultSet, 
session, metrics);
+                });
     }
 
     private static CompletionStage<Void> writeResultSetAsync(
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 0c05718307..ad25f0be1e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -42,13 +42,13 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.TemporalNativeType;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.sql.ColumnType;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.manager.IgniteTables;
-import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -367,16 +367,26 @@ public class ClientTableCommon {
      * Reads transaction.
      *
      * @param in Unpacker.
+     * @param out Packer.
      * @param resources Resource registry.
      * @return Transaction, if present, or null.
      */
-    public static @Nullable Transaction readTx(ClientMessageUnpacker in, 
ClientResourceRegistry resources) {
+    public static @Nullable InternalTransaction readTx(
+            ClientMessageUnpacker in, ClientMessagePacker out, 
ClientResourceRegistry resources) {
         if (in.tryUnpackNil()) {
             return null;
         }
 
         try {
-            return resources.get(in.unpackLong()).get(Transaction.class);
+            var tx = 
resources.get(in.unpackLong()).get(InternalTransaction.class);
+
+            if (tx != null && tx.isReadOnly()) {
+                // For read-only tx, override observable timestamp that we 
send to the client:
+                // use readTimestamp() instead of now().
+                out.meta(tx.readTimestamp());
+            }
+
+            return tx;
         } catch (IgniteInternalCheckedException e) {
             throw new IgniteException(e.traceId(), e.code(), e.getMessage(), 
e);
         }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
index e3564e33b8..9e21b2bd46 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleContainsKeyRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var keyTuple = readTuple(in, table, true);
 
             return table.recordView().getAsync(tx, keyTuple)
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
index 7f203d0051..4ebaac2182 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleDeleteAllExactRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuples = readTuples(in, table, false);
 
             return table.recordView().deleteAllExactAsync(tx, tuples)
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
index 3bcb103f66..2b991d444d 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
@@ -49,7 +49,7 @@ public class ClientTupleDeleteAllRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuples = readTuples(in, table, true);
 
             return table.recordView().deleteAllAsync(tx, 
tuples).thenAccept(skippedTuples ->
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
index 0d9a480403..55aac9821e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleDeleteExactRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, false);
 
             return table.recordView().deleteExactAsync(tx, 
tuple).thenAccept(res -> {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
index 9b43617036..cda8f6d7b6 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleDeleteRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, true);
 
             return table.recordView().deleteAsync(tx, tuple).thenAccept(res -> 
{
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
index 3f2b74886b..ca63e74323 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
@@ -49,7 +49,7 @@ public class ClientTupleGetAllRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var keyTuples = readTuples(in, table, true);
 
             return table.recordView().getAllAsync(tx, 
keyTuples).thenAccept(tuples ->
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
index 91197d6bc1..c2b894c70e 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetAndDeleteRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, true);
 
             return table.recordView().getAndDeleteAsync(tx, tuple).thenAccept(
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
index 7a78ecc44c..626750d2f2 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetAndReplaceRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, false);
 
             return table.recordView().getAndReplaceAsync(tx, tuple).thenAccept(
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
index 9245336c45..873a0e6758 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetAndUpsertRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, false);
 
             return table.recordView().getAndUpsertAsync(tx, tuple).thenAccept(
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
index 4d6530a8b7..51c88acfa2 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleGetRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var keyTuple = readTuple(in, table, true);
 
             return table.recordView().getAsync(tx, keyTuple)
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
index 0af2351882..cc7291643b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleInsertAllRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuples = readTuples(in, table, false);
 
             return table.recordView().insertAllAsync(tx, 
tuples).thenAccept(skippedTuples ->
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
index 81cf7f71e9..5398ea50fd 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleInsertRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, false);
 
             return table.recordView().insertAsync(tx, tuple).thenAccept(res -> 
{
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
index 4d10dd7859..0c1955e21b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
@@ -48,7 +48,7 @@ public class ClientTupleReplaceExactRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var schema = readSchema(in, table);
 
             var oldTuple = readTuple(in, false, schema);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
index e7df36a156..46b1f48958 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleReplaceRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, false);
 
             return table.recordView().replaceAsync(tx, tuple).thenAccept(res 
-> {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
index a4f178d1ea..63e9192bba 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleUpsertAllRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuples = readTuples(in, table, false);
 
             return table.recordView().upsertAllAsync(tx, tuples)
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index f78990d921..b1d3bcf204 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -47,7 +47,7 @@ public class ClientTupleUpsertRequest {
             ClientResourceRegistry resources
     ) {
         return readTableAsync(in, tables).thenCompose(table -> {
-            var tx = readTx(in, resources);
+            var tx = readTx(in, out, resources);
             var tuple = readTuple(in, table, false);
 
             return table.recordView().upsertAsync(tx, tuple).thenAccept(v -> 
out.packInt(table.schemaView().lastSchemaVersion()));
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
index 5f05681c57..8464cc4a89 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
@@ -23,10 +23,11 @@ import org.apache.ignite.client.handler.ClientResource;
 import org.apache.ignite.client.handler.ClientResourceRegistry;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Client transaction begin request.
@@ -42,28 +43,42 @@ public class ClientTransactionBeginRequest {
      * @param metrics      Metrics.
      * @return Future.
      */
-    public static CompletableFuture<Void> process(
+    public static @Nullable CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteTransactions transactions,
+            IgniteTransactionsImpl transactions,
             ClientResourceRegistry resources,
-            ClientHandlerMetricSource metrics) {
+            ClientHandlerMetricSource metrics) throws 
IgniteInternalCheckedException {
         TransactionOptions options = null;
+        HybridTimestamp observableTs = null;
 
-        if (in.unpackBoolean()) {
+        boolean readOnly = in.unpackBoolean();
+        if (readOnly) {
             options = new TransactionOptions().readOnly(true);
+
+            // Timestamp makes sense only for read-only transactions.
+            observableTs = 
HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
+        }
+
+        // NOTE: we don't use beginAsync here because it is synchronous anyway.
+        var tx = transactions.begin(options, observableTs);
+
+        if (readOnly) {
+            // For read-only tx, override observable timestamp that we send to 
the client:
+            // use readTimestamp() instead of now().
+            out.meta(tx.readTimestamp());
         }
 
-        return transactions.beginAsync(options).thenAccept(tx -> {
-            try {
-                long resourceId = resources.put(new ClientResource(tx, 
tx::rollbackAsync));
-                out.packLong(resourceId);
+        try {
+            long resourceId = resources.put(new ClientResource(tx, 
tx::rollbackAsync));
+            out.packLong(resourceId);
+
+            metrics.transactionsActiveIncrement();
 
-                metrics.transactionsActiveIncrement();
-            } catch (IgniteInternalCheckedException e) {
-                tx.rollback();
-                throw new IgniteInternalException(e.getMessage(), e);
-            }
-        });
+            return null;
+        } catch (IgniteInternalCheckedException e) {
+            tx.rollback();
+            throw e;
+        }
     }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
index 46895e3ac1..e1c3ca42d4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannel.java
@@ -33,7 +33,7 @@ public interface ClientChannel extends AutoCloseable {
      * @param <T>           Response type.
      * @return Future for the operation.
      */
-    public <T> CompletableFuture<T> serviceAsync(
+    <T> CompletableFuture<T> serviceAsync(
             int opCode,
             PayloadWriter payloadWriter,
             PayloadReader<T> payloadReader
@@ -44,19 +44,26 @@ public interface ClientChannel extends AutoCloseable {
      *
      * @return {@code True} channel is closed.
      */
-    public boolean closed();
+    boolean closed();
 
     /**
      * Returns protocol context.
      *
      * @return Protocol context.
      */
-    public ProtocolContext protocolContext();
+    ProtocolContext protocolContext();
 
     /**
      * Add topology change listener.
      *
      * @param listener Listener.
      */
-    public void addTopologyAssignmentChangeListener(Consumer<ClientChannel> 
listener);
+    void addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener);
+
+    /**
+     * Add observable timestamp listener.
+     *
+     * @param listener Listener.
+     */
+    void addObservableTimestampListener(Consumer<Long> listener);
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 0239296e59..c24098c1c2 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -106,6 +106,9 @@ public final class ReliableChannel implements AutoCloseable 
{
      * the table will compare its version with channel version to detect an 
update. */
     private final AtomicLong assignmentVersion = new AtomicLong();
 
+    /** Observable timestamp, or causality token. Sent by the server with 
every response, and required by some requests. */
+    private final AtomicLong observableTimestamp = new AtomicLong();
+
     /** Cluster id from the first handshake. */
     private final AtomicReference<UUID> clusterId = new AtomicReference<>();
 
@@ -180,6 +183,10 @@ public final class ReliableChannel implements 
AutoCloseable {
         return clientCfg;
     }
 
+    public long observableTimestamp() {
+        return observableTimestamp.get();
+    }
+
     /**
      * Sends request and handles response asynchronously.
      *
@@ -649,6 +656,21 @@ public final class ReliableChannel implements 
AutoCloseable {
         }
     }
 
+    private void onObservableTimestampReceived(Long newTs) {
+        // Atomically update the observable timestamp to max(newTs, curTs).
+        while (true) {
+            long curTs = observableTimestamp.get();
+
+            if (curTs >= newTs) {
+                break;
+            }
+
+            if (observableTimestamp.compareAndSet(curTs, newTs)) {
+                break;
+            }
+        }
+    }
+
     private void onTopologyAssignmentChanged(ClientChannel clientChannel) {
         // NOTE: Multiple channels will send the same update to us, resulting 
in multiple cache invalidations.
         // This could be solved with a cluster-wide AssignmentVersion, but we 
don't have that.
@@ -793,6 +815,7 @@ public final class ReliableChannel implements AutoCloseable 
{
                     }
 
                     
ch.addTopologyAssignmentChangeListener(ReliableChannel.this::onTopologyAssignmentChanged);
+                    
ch.addObservableTimestampListener(ReliableChannel.this::onObservableTimestampReceived);
 
                     ClusterNode newNode = ch.protocolContext().clusterNode();
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 61f72bbebb..e11d3dd4de 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -102,6 +102,9 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
     /** Topology change listeners. */
     private final Collection<Consumer<ClientChannel>> 
assignmentChangeListeners = new CopyOnWriteArrayList<>();
 
+    /** Observable timestamp listeners. */
+    private final Collection<Consumer<Long>> observableTimestampListeners = 
new CopyOnWriteArrayList<>();
+
     /** Closed flag. */
     private final AtomicBoolean closed = new AtomicBoolean();
 
@@ -402,6 +405,12 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             }
         }
 
+        long observableTimestamp = unpacker.unpackLong();
+
+        for (Consumer<Long> listener : observableTimestampListeners) {
+            listener.accept(observableTimestamp);
+        }
+
         if (unpacker.tryUnpackNil()) {
             boolean completed = pendingReq.complete(unpacker);
 
@@ -489,6 +498,11 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         assignmentChangeListeners.add(listener);
     }
 
+    @Override
+    public void addObservableTimestampListener(Consumer<Long> listener) {
+        observableTimestampListeners.add(listener);
+    }
+
     private static void validateConfiguration(ClientChannelConfiguration cfg) {
         String error = null;
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 970bdd4eae..6f309d293c 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -158,6 +158,8 @@ public class ClientSession implements Session {
             w.out().packString(clientStatement.query());
 
             w.out().packObjectArrayAsBinaryTuple(arguments);
+
+            w.out().packLong(ch.observableTimestamp());
         };
 
         PayloadReader<AsyncResultSet<T>> payloadReader = r -> new 
ClientAsyncResultSet<>(r.clientChannel(), r.in(), mapper);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
index b2102b4e8f..8944d353a5 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
@@ -64,7 +64,10 @@ public class ClientTransactions implements 
IgniteTransactions {
 
         return ch.serviceAsync(
                 ClientOp.TX_BEGIN,
-                w -> w.out().packBoolean(readOnly),
+                w -> {
+                    w.out().packBoolean(readOnly);
+                    w.out().packLong(ch.observableTimestamp());
+                },
                 r -> readTx(r, readOnly));
     }
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index ae52552461..8b83a66c3f 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -223,7 +223,7 @@ public class ClientMetricsTest extends 
BaseIgniteAbstractTest {
         client.tables().tables();
 
         assertEquals(21, metrics().bytesSent());
-        assertEquals(55, metrics().bytesReceived());
+        assertEquals(64, metrics().bytesReceived());
     }
 
     @Test
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
new file mode 100644
index 0000000000..78b4c31293
--- /dev/null
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ObservableTimestampPropagationTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.TestHybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests that observable timestamp (causality token) is propagated from server 
to client and back.
+ */
+public class ObservableTimestampPropagationTest {
+    private static TestServer testServer;
+
+    private static FakeIgnite ignite;
+
+    private static IgniteClient client;
+
+    private static final AtomicLong currentServerTimestamp = new AtomicLong(1);
+
+    @BeforeAll
+    public static void startServer2() {
+        TestHybridClock clock = new 
TestHybridClock(currentServerTimestamp::get);
+
+        ignite = new FakeIgnite("server-2");
+        testServer = new TestServer(0, ignite, null, null, "server-2", 
UUID.randomUUID(), null, null, clock);
+
+        client = IgniteClient.builder().addresses("127.0.0.1:" + 
testServer.port()).build();
+    }
+
+    @AfterAll
+    public static void stopServer2() throws Exception {
+        IgniteUtils.closeAll(client, testServer, ignite);
+    }
+
+    @Test
+    public void testClientPropagatesLatestKnownHybridTimestamp() {
+        assertNull(lastObservableTimestamp());
+
+        // RW TX does not propagate timestamp.
+        client.transactions().begin();
+        assertNull(lastObservableTimestamp());
+
+        // RO TX propagates timestamp.
+        client.transactions().begin(new TransactionOptions().readOnly(true));
+        assertEquals(1, lastObservableTimestamp());
+
+        // Increase timestamp on server - client does not know about it 
initially.
+        currentServerTimestamp.set(11);
+        client.transactions().begin(new TransactionOptions().readOnly(true));
+        assertEquals(1, lastObservableTimestamp());
+
+        // Subsequent RO TX propagates latest known timestamp.
+        client.tables().tables();
+        client.transactions().begin(new TransactionOptions().readOnly(true));
+        assertEquals(11, lastObservableTimestamp());
+
+        // Smaller timestamp from server is ignored by client.
+        currentServerTimestamp.set(9);
+        client.transactions().begin(new TransactionOptions().readOnly(true));
+        client.transactions().begin(new TransactionOptions().readOnly(true));
+        assertEquals(11, lastObservableTimestamp());
+    }
+
+    private static @Nullable Long lastObservableTimestamp() {
+        HybridTimestamp ts = ignite.txManager().lastObservableTimestamp();
+
+        return ts == null ? null : ts.longValue() >> LOGICAL_TIME_BITS_SIZE;
+    }
+}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
 
b/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
index e28efff465..cbdcc47435 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/RequestBalancingTest.java
@@ -65,7 +65,7 @@ public class RequestBalancingTest extends 
BaseIgniteAbstractTest {
             assertTrue(IgniteTestUtils.waitForCondition(() -> 
client.connections().size() == 3, 3000));
 
             // Execute on unknown node to fall back to balancing.
-            List<String> res = IntStream.range(0, 5)
+            List<Object> res = IntStream.range(0, 5)
                     .mapToObj(i -> 
client.compute().<String>executeAsync(getClusterNodes("s123"), List.of(), 
"job").join())
                     .collect(Collectors.toList());
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index c91537c005..3c439c2242 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -40,11 +40,13 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
 import org.apache.ignite.internal.configuration.AuthenticationConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManager;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
@@ -78,6 +80,9 @@ public class TestClientHandlerModule implements 
IgniteComponent {
     /** Metrics. */
     private final ClientHandlerMetricSource metrics;
 
+    /** Clock. */
+    private final HybridClock clock;
+
     /** Netty channel. */
     private volatile Channel channel;
 
@@ -99,6 +104,7 @@ public class TestClientHandlerModule implements 
IgniteComponent {
      * @param compute Compute.
      * @param clusterId Cluster id.
      * @param metrics Metrics.
+     * @param clock Clock.
      */
     public TestClientHandlerModule(
             Ignite ignite,
@@ -110,7 +116,8 @@ public class TestClientHandlerModule implements 
IgniteComponent {
             IgniteCompute compute,
             UUID clusterId,
             ClientHandlerMetricSource metrics,
-            AuthenticationConfiguration authenticationConfiguration) {
+            AuthenticationConfiguration authenticationConfiguration,
+            HybridClock clock) {
         assert ignite != null;
         assert registry != null;
         assert bootstrapFactory != null;
@@ -125,6 +132,7 @@ public class TestClientHandlerModule implements 
IgniteComponent {
         this.clusterId = clusterId;
         this.metrics = metrics;
         this.authenticationConfiguration = authenticationConfiguration;
+        this.clock = clock;
     }
 
     /** {@inheritDoc} */
@@ -184,7 +192,7 @@ public class TestClientHandlerModule implements 
IgniteComponent {
                                 new ResponseDelayHandler(responseDelay),
                                 new ClientInboundMessageHandler(
                                         (IgniteTablesInternal) ignite.tables(),
-                                        ignite.transactions(),
+                                        (IgniteTransactionsImpl) 
ignite.transactions(),
                                         mock(QueryProcessor.class),
                                         configuration,
                                         compute,
@@ -192,7 +200,8 @@ public class TestClientHandlerModule implements 
IgniteComponent {
                                         ignite.sql(),
                                         clusterId,
                                         metrics,
-                                        
authenticationManager(authenticationConfiguration)));
+                                        
authenticationManager(authenticationConfiguration),
+                                        clock));
                     }
                 })
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
configuration.connectTimeout());
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java 
b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 1aba74f063..3be82094ea 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -45,12 +45,15 @@ import 
org.apache.ignite.internal.configuration.ConfigurationRegistry;
 import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator;
 import 
org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
 import 
org.apache.ignite.internal.configuration.validation.TestConfigurationValidator;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManager;
 import 
org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NettyBootstrapFactory;
@@ -94,6 +97,33 @@ public class TestServer implements AutoCloseable {
                 null,
                 UUID.randomUUID(),
                 null,
+                null,
+                null
+        );
+    }
+
+    /**
+     * Constructor.
+     */
+    public TestServer(
+            long idleTimeout,
+            Ignite ignite,
+            @Nullable Function<Integer, Boolean> shouldDropConnection,
+            @Nullable Function<Integer, Integer> responseDelay,
+            @Nullable String nodeName,
+            UUID clusterId,
+            @Nullable AuthenticationConfiguration authenticationConfiguration,
+            @Nullable Integer port
+    ) {
+        this(
+                idleTimeout,
+                ignite,
+                shouldDropConnection,
+                responseDelay,
+                nodeName,
+                clusterId,
+                authenticationConfiguration,
+                port,
                 null
         );
     }
@@ -112,7 +142,8 @@ public class TestServer implements AutoCloseable {
             @Nullable String nodeName,
             UUID clusterId,
             @Nullable AuthenticationConfiguration authenticationConfiguration,
-            @Nullable Integer port
+            @Nullable Integer port,
+            @Nullable HybridClock clock
     ) {
         ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
 
@@ -156,6 +187,11 @@ public class TestServer implements AutoCloseable {
         AuthenticationConfiguration authenticationConfigToApply = 
authenticationConfiguration == null
                 ? mock(AuthenticationConfiguration.class)
                 : authenticationConfiguration;
+
+        if (clock == null) {
+            clock = new HybridClockImpl();
+        }
+
         module = shouldDropConnection != null
                 ? new TestClientHandlerModule(
                         ignite,
@@ -167,11 +203,12 @@ public class TestServer implements AutoCloseable {
                         compute,
                         clusterId,
                         metrics,
-                        authenticationConfigToApply)
+                        authenticationConfigToApply,
+                        clock)
                 : new ClientHandlerModule(
                         ((FakeIgnite) ignite).queryEngine(),
                         (IgniteTablesInternal) ignite.tables(),
-                        ignite.transactions(),
+                        (IgniteTransactionsImpl) ignite.transactions(),
                         cfg,
                         compute,
                         clusterService,
@@ -181,7 +218,8 @@ public class TestServer implements AutoCloseable {
                         mock(MetricManager.class),
                         metrics,
                         authenticationManager(authenticationConfigToApply),
-                        authenticationConfigToApply
+                        authenticationConfigToApply,
+                        clock
                         );
 
         module.start();
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index cd692c8726..c9eb41018e 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -18,26 +18,17 @@
 package org.apache.ignite.client.fakes;
 
 import java.util.Collection;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.sql.engine.QueryProcessor;
-import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxState;
-import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.manager.IgniteTables;
 import org.apache.ignite.tx.IgniteTransactions;
-import org.apache.ignite.tx.Transaction;
-import org.apache.ignite.tx.TransactionException;
-import org.apache.ignite.tx.TransactionOptions;
-import org.jetbrains.annotations.NotNull;
 
 /**
  * Fake Ignite.
@@ -47,6 +38,8 @@ public class FakeIgnite implements Ignite {
 
     private final HybridClock clock = new HybridClockImpl();
 
+    private final FakeTxManager txMgr = new FakeTxManager(clock);
+
     /**
      * Default constructor.
      */
@@ -79,91 +72,7 @@ public class FakeIgnite implements Ignite {
     /** {@inheritDoc} */
     @Override
     public IgniteTransactions transactions() {
-        return new IgniteTransactions() {
-            @Override
-            public Transaction begin(TransactionOptions options) {
-                return beginAsync(options).join();
-            }
-
-            @Override
-            public CompletableFuture<Transaction> 
beginAsync(TransactionOptions options) {
-                return CompletableFuture.completedFuture(new 
InternalTransaction() {
-                    private final UUID id = UUID.randomUUID();
-
-                    private final HybridTimestamp timestamp = clock.now();
-
-                    @Override
-                    public @NotNull UUID id() {
-                        return id;
-                    }
-
-                    @Override
-                    public IgniteBiTuple<ClusterNode, Long> 
enlistedNodeAndTerm(TablePartitionId tablePartitionId) {
-                        return null;
-                    }
-
-                    @Override
-                    public TxState state() {
-                        return null;
-                    }
-
-                    @Override
-                    public boolean assignCommitPartition(TablePartitionId 
tablePartitionId) {
-                        return false;
-                    }
-
-                    @Override
-                    public TablePartitionId commitPartition() {
-                        return null;
-                    }
-
-                    @Override
-                    public IgniteBiTuple<ClusterNode, Long> enlist(
-                            TablePartitionId tablePartitionId,
-                            IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
-                        return null;
-                    }
-
-                    @Override
-                    public void enlistResultFuture(CompletableFuture<?> 
resultFuture) {}
-
-                    @Override
-                    public void commit() throws TransactionException {
-
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> commitAsync() {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    @Override
-                    public void rollback() throws TransactionException {
-
-                    }
-
-                    @Override
-                    public CompletableFuture<Void> rollbackAsync() {
-                        return CompletableFuture.completedFuture(null);
-                    }
-
-                    @Override
-                    public boolean isReadOnly() {
-                        return false;
-                    }
-
-                    @Override
-                    public HybridTimestamp readTimestamp() {
-                        return null;
-                    }
-
-                    @Override
-                    public HybridTimestamp startTimestamp() {
-                        return timestamp;
-                    }
-                });
-            }
-        };
+        return new IgniteTransactionsImpl(txMgr);
     }
 
     /** {@inheritDoc} */
@@ -201,4 +110,8 @@ public class FakeIgnite implements Ignite {
     public String name() {
         return name;
     }
+
+    public FakeTxManager txManager() {
+        return txMgr;
+    }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
new file mode 100644
index 0000000000..11f33056c3
--- /dev/null
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.fakes;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Fake transaction manager.
+ */
+public class FakeTxManager implements TxManager {
+    private final HybridClock clock;
+
+    private HybridTimestamp lastObservableTimestamp = null;
+
+    public FakeTxManager(HybridClock clock) {
+        this.clock = clock;
+    }
+
+    @Override
+    public void start() {
+        // No-op.
+    }
+
+    @Override
+    public void stop() throws Exception {
+        // No-op.
+    }
+
+    @Override
+    public InternalTransaction begin() {
+        return begin(false, null);
+    }
+
+    @Override
+    public InternalTransaction begin(boolean readOnly, @Nullable 
HybridTimestamp observableTimestamp) {
+        lastObservableTimestamp = observableTimestamp;
+
+        return new InternalTransaction() {
+            private final UUID id = UUID.randomUUID();
+
+            private final HybridTimestamp timestamp = clock.now();
+
+            @Override
+            public @NotNull UUID id() {
+                return id;
+            }
+
+            @Override
+            public IgniteBiTuple<ClusterNode, Long> 
enlistedNodeAndTerm(TablePartitionId tablePartitionId) {
+                return null;
+            }
+
+            @Override
+            public TxState state() {
+                return null;
+            }
+
+            @Override
+            public boolean assignCommitPartition(TablePartitionId 
tablePartitionId) {
+                return false;
+            }
+
+            @Override
+            public TablePartitionId commitPartition() {
+                return null;
+            }
+
+            @Override
+            public IgniteBiTuple<ClusterNode, Long> enlist(
+                    TablePartitionId tablePartitionId,
+                    IgniteBiTuple<ClusterNode, Long> nodeAndTerm) {
+                return null;
+            }
+
+            @Override
+            public void enlistResultFuture(CompletableFuture<?> resultFuture) {
+            }
+
+            @Override
+            public void commit() throws TransactionException {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> commitAsync() {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override
+            public void rollback() throws TransactionException {
+
+            }
+
+            @Override
+            public CompletableFuture<Void> rollbackAsync() {
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override
+            public boolean isReadOnly() {
+                return false;
+            }
+
+            @Override
+            public HybridTimestamp readTimestamp() {
+                return observableTimestamp;
+            }
+
+            @Override
+            public HybridTimestamp startTimestamp() {
+                return timestamp;
+            }
+        };
+    }
+
+    @Override
+    public @Nullable TxState state(UUID txId) {
+        return null;
+    }
+
+    @Override
+    public void changeState(UUID txId, @Nullable TxState before, TxState 
after) {
+
+    }
+
+    @Override
+    public LockManager lockManager() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> finish(TablePartitionId commitPartition, 
ClusterNode recipientNode, Long term, boolean commit,
+            Map<ClusterNode, List<IgniteBiTuple<TablePartitionId, Long>>> 
groups, UUID txId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> cleanup(ClusterNode recipientNode, 
List<IgniteBiTuple<TablePartitionId, Long>> tablePartitionIds,
+            UUID txId, boolean commit, @Nullable HybridTimestamp 
commitTimestamp) {
+        return null;
+    }
+
+    @Override
+    public int finished() {
+        return 0;
+    }
+
+    @Override
+    public int pending() {
+        return 0;
+    }
+
+    @Override
+    public CompletableFuture<Void> updateLowWatermark(HybridTimestamp 
newLowWatermark) {
+        return null;
+    }
+
+    public @Nullable HybridTimestamp lastObservableTimestamp() {
+        return lastObservableTimestamp;
+    }
+}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
 
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index 6a681d8e26..63041b71f2 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -204,11 +204,16 @@ public class RepeatedFinishClientTransactionTest extends 
BaseIgniteAbstractTest
 
         @Override
         public void 
addTopologyAssignmentChangeListener(Consumer<ClientChannel> listener) {
+            // No-op.
+        }
 
+        @Override
+        public void addObservableTimestampListener(Consumer<Long> listener) {
+            // No-op.
         }
 
         @Override
-        public void close() throws Exception {
+        public void close() {
 
         }
     }
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp 
b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index e90ad52ecb..1c9972cbd2 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -91,6 +91,9 @@ void node_connection::process_message(bytes_view msg) {
     auto flags = reader.read_int32();
     UNUSED_VALUE flags; // Flags are unused for now.
 
+    auto observable_timestamp = reader.read_int64();
+    UNUSED_VALUE observable_timestamp; // // TODO IGNITE-20057 C++ client: 
Track observable timestamp
+
     auto handler = get_and_remove_handler(req_id);
     if (!handler) {
         m_logger->log_error("Missing handler for request with id=" + 
std::to_string(req_id));
diff --git a/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp 
b/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp
index 39e4eff4c5..10a36d2257 100644
--- a/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/sql/sql_impl.cpp
@@ -65,27 +65,29 @@ void sql_impl::execute_async(transaction *tx, const 
sql_statement &statement, st
 
         if (args.empty()) {
             writer.write_nil();
-            return;
-        }
+        } else {
+            auto args_num = std::int32_t(args.size());
 
-        auto args_num = std::int32_t(args.size());
+            writer.write(args_num);
 
-        writer.write(args_num);
+            binary_tuple_builder args_builder{args_num * 3};
 
-        binary_tuple_builder args_builder{args_num * 3};
+            args_builder.start();
+            for (const auto &arg : args) {
+                protocol::claim_primitive_with_type(args_builder, arg);
+            }
 
-        args_builder.start();
-        for (const auto &arg : args) {
-            protocol::claim_primitive_with_type(args_builder, arg);
-        }
+            args_builder.layout();
+            for (const auto &arg : args) {
+                protocol::append_primitive_with_type(args_builder, arg);
+            }
 
-        args_builder.layout();
-        for (const auto &arg : args) {
-            protocol::append_primitive_with_type(args_builder, arg);
+            auto args_data = args_builder.build();
+            writer.write_binary(args_data);
         }
 
-        auto args_data = args_builder.build();
-        writer.write_binary(args_data);
+        // TODO IGNITE-20057 C++ client: Track observable timestamp
+        writer.write(0); // observableTimestamp.
     };
 
     auto reader_func = [](std::shared_ptr<node_connection> channel, bytes_view 
msg) -> result_set {
diff --git 
a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h 
b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
index 1b1aa3c943..98f792367c 100644
--- a/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/transaction/transactions_impl.h
@@ -57,6 +57,9 @@ public:
     IGNITE_API void begin_async(ignite_callback<transaction> callback) {
         auto writer_func = [](protocol::writer &writer) {
             writer.write_bool(false); // readOnly.
+
+            // TODO IGNITE-20057 C++ client: Track observable timestamp
+            writer.write(0); // observableTimestamp.
         };
 
         auto reader_func = [](protocol::reader &reader, 
std::shared_ptr<node_connection> conn) mutable -> transaction {
diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp 
b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
index f59b116eba..9d039567c0 100644
--- a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
+++ b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
@@ -344,6 +344,9 @@ sql_result data_query::make_request_execute() {
             writer.write(m_query);
 
             m_params.write(writer);
+
+            // TODO IGNITE-20057 C++ client: Track observable timestamp
+            writer.write(0); // observableTimestamp.
         });
 
         m_connection.mark_transaction_non_empty();
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp 
b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
index 81b14139d9..9b3130da7d 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
@@ -325,6 +325,9 @@ network::data_buffer_owning 
sql_connection::receive_message(std::int64_t id, std
         auto flags = reader.read_int32();
         UNUSED_VALUE flags; // Flags are unused for now.
 
+        auto observable_timestamp = reader.read_int64();
+        UNUSED_VALUE observable_timestamp; // // TODO IGNITE-20057 C++ client: 
Track observable timestamp
+
         auto err = protocol::read_error(reader);
         if (err) {
             throw odbc_error(sql_state::SHY000_GENERAL_ERROR, 
err.value().what_str());
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 96dadabeff..81c668e199 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -339,6 +339,7 @@ namespace Apache.Ignite.Tests
             writer.Write(0); // Message type.
             writer.Write(requestId);
             writer.Write(PartitionAssignmentChanged ? 
(int)ResponseFlags.PartitionAssignmentChanged : 0);
+            writer.Write(0); // Observable timestamp.
 
             if (!isError)
             {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
index 5f0258b066..501fb2e362 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/MetricsTests.cs
@@ -97,7 +97,7 @@ public class MetricsTests
         await client.Tables.GetTablesAsync();
 
         Assert.AreEqual(17, _listener.GetMetric("bytes-sent"));
-        Assert.AreEqual(72, _listener.GetMetric("bytes-received"));
+        Assert.AreEqual(73, _listener.GetMetric("bytes-received"));
     }
 
     [Test]
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs
index 10703c8d9f..8034286d48 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorGroups.cs
@@ -17,7 +17,6 @@
 
 namespace Apache.Ignite
 {
-    using System;
     using System.Diagnostics.CodeAnalysis;
 
     /// <summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index f9319fc3d7..d61dd176e0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -700,6 +700,9 @@ namespace Apache.Ignite.Internal
                 _assignmentChangeCallback(this);
             }
 
+            // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
+            _ = reader.ReadInt64();
+
             var exception = ReadError(ref reader);
 
             if (exception != null)
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
index ad8a47ec5b..4476eab85b 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackWriter.cs
@@ -419,6 +419,11 @@ internal readonly ref struct MsgPackWriter
 
         Write(col.Count);
 
+        if (col.Count == 0)
+        {
+            return;
+        }
+
         using var builder = new BinaryTupleBuilder(col.Count * 3);
 
         foreach (var obj in col)
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
index 4d93e33270..2d23d8b4d3 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Sql/Sql.cs
@@ -190,6 +190,9 @@ namespace Apache.Ignite.Internal.Sql
                 w.Write(statement.Query);
                 w.WriteObjectCollectionAsBinaryTuple(args);
 
+                // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
+                w.Write(0);
+
                 return writer;
             }
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
index 2e875a603d..31129c944c 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Transactions/Transactions.cs
@@ -43,7 +43,7 @@ namespace Apache.Ignite.Internal.Transactions
         public async Task<ITransaction> BeginAsync(TransactionOptions options)
         {
             using var writer = ProtoCommon.GetMessageWriter();
-            writer.MessageWriter.Write(options.ReadOnly);
+            Write();
 
             // Transaction and all corresponding operations must be performed 
using the same connection.
             var (resBuf, socket) = await 
_socket.DoOutInOpAndGetSocketAsync(ClientOp.TxBegin, request: 
writer).ConfigureAwait(false);
@@ -54,6 +54,15 @@ namespace Apache.Ignite.Internal.Transactions
 
                 return new Transaction(txId, socket, _socket, 
options.ReadOnly);
             }
+
+            void Write()
+            {
+                var w = writer.MessageWriter;
+                w.Write(options.ReadOnly);
+
+                // TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
+                w.Write(0);
+            }
         }
 
         /// <inheritdoc />
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index de5e7e783a..27eaf4004f 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -605,7 +605,8 @@ public class IgniteImpl implements Ignite {
                 metricManager,
                 new ClientHandlerMetricSource(),
                 authenticationManager,
-                authenticationConfiguration
+                authenticationConfiguration,
+                clock
                 );
 
         restComponent = createRestComponent(name);
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 58dc4f93f0..074d54a197 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -55,7 +55,7 @@ public interface TxManager extends IgniteComponent {
      * @throws IgniteInternalException with {@link 
Transactions#TX_READ_ONLY_TOO_OLD_ERR} if transaction much older than the data 
available
      *      in the tables.
      */
-    InternalTransaction begin(boolean readOnly, HybridTimestamp 
observableTimestamp);
+    InternalTransaction begin(boolean readOnly, @Nullable HybridTimestamp 
observableTimestamp);
 
     /**
      * Returns a transaction state.
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
index a4b8ea297f..bb67f6232b 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.tx.impl;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.apache.ignite.tx.Transaction;
@@ -39,25 +41,34 @@ public class IgniteTransactionsImpl implements 
IgniteTransactions {
         this.txManager = txManager;
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public Transaction begin(@Nullable TransactionOptions options) {
+    /**
+     * Begins a transaction.
+     *
+     * @param options Transaction options.
+     * @param observableTimestamp Observable timestamp, applicable only for 
read-only transactions. Read-only transactions
+     *      can use some time to the past to avoid waiting for time that is 
safe for reading on non-primary replica. To do so, client
+     *      should provide this observable timestamp that is calculated 
according to the commit time of the latest read-write transaction,
+     *      to guarantee that read-only transaction will see the modified data.
+     * @return The started transaction.
+     */
+    public InternalTransaction begin(@Nullable TransactionOptions options, 
@Nullable HybridTimestamp observableTimestamp) {
         if (options != null && options.timeoutMillis() != 0) {
             // TODO: IGNITE-15936.
             throw new UnsupportedOperationException("Timeouts are not 
supported yet");
         }
 
-        return txManager.begin(options != null && options.readOnly(), null);
+        return txManager.begin(options != null && options.readOnly(), 
observableTimestamp);
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Transaction> beginAsync(@Nullable 
TransactionOptions options) {
-        if (options != null && options.timeoutMillis() != 0) {
-            // TODO: IGNITE-15936.
-            throw new UnsupportedOperationException("Timeouts are not 
supported yet");
-        }
+    public Transaction begin(@Nullable TransactionOptions options) {
+        return begin(options, null);
+    }
 
-        return CompletableFuture.completedFuture(txManager.begin(options != 
null && options.readOnly(), null));
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Transaction> beginAsync(@Nullable 
TransactionOptions options) {
+        return CompletableFuture.completedFuture(begin(options, null));
     }
 }
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index f60d5720dd..f1147dd08c 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -117,7 +117,7 @@ public class TxManagerImpl implements TxManager {
     }
 
     @Override
-    public InternalTransaction begin(boolean readOnly, HybridTimestamp 
observableTimestamp) {
+    public InternalTransaction begin(boolean readOnly, @Nullable 
HybridTimestamp observableTimestamp) {
         assert readOnly || observableTimestamp == null : "Observable timestamp 
is applicable just for read-only transactions.";
 
         HybridTimestamp beginTimestamp = clock.now();


Reply via email to