This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 14429f22394 IGNITE-25718 Propagate observableTs to compute jobs (#7411)
14429f22394 is described below

commit 14429f22394186912e3ae1d811143d78c6e03336
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Jan 16 14:41:48 2026 +0200

    IGNITE-25718 Propagate observableTs to compute jobs (#7411)
    
    Send observable timestamp with the compute job to ensure that the job sees 
all changes made by the client.
---
 .../client/proto/ClientComputeJobPacker.java       | 31 ++++++++++++++----
 .../client/proto/ClientComputeJobUnpacker.java     | 24 ++++++++++----
 .../client/proto/ProtocolBitmaskFeature.java       |  7 +++-
 .../internal/compute/ComputeJobDataHolder.java     | 12 +------
 .../proto/ClientComputeJobPackerUnpackerTest.java  |  4 +--
 .../ignite/client/handler/ItClientHandlerTest.java |  1 +
 .../ignite/client/handler/ClientHandlerModule.java |  3 +-
 .../handler/ClientInboundMessageHandler.java       |  4 +--
 .../ClientComputeExecuteColocatedRequest.java      |  5 ++-
 .../ClientComputeExecuteMapReduceRequest.java      |  5 ++-
 .../ClientComputeExecutePartitionedRequest.java    |  5 ++-
 .../compute/ClientComputeExecuteRequest.java       |  5 ++-
 .../ignite/internal/client/TcpClientChannel.java   |  3 +-
 .../internal/client/compute/ClientCompute.java     | 30 +++++++++++-------
 .../ignite/internal/compute/ItComputeBaseTest.java | 37 ++++++++++++++++++++++
 .../internal/compute/ItComputeTestClient.java      |  9 ++++++
 .../ignite/internal/compute/utils/Clients.java     | 16 +++++++---
 .../internal/compute/ComputeComponentImpl.java     |  9 +++++-
 .../ignite/internal/compute/ExecutionContext.java  | 19 +++++++++++
 .../ignite/internal/compute/IgniteComputeImpl.java |  6 ++--
 .../internal/compute/ComputeComponentImplTest.java |  4 ++-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  3 +-
 22 files changed, 188 insertions(+), 54 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
index b7442ad323b..ddeb6f5d806 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
@@ -33,10 +33,15 @@ public final class ClientComputeJobPacker {
      * @param arg Argument.
      * @param marshaller Marshaller.
      * @param packer Packer.
+     * @param observableTs Observable timestamp. Not packed when null.
      * @param <T> Argument type.
      */
-    public static <T> void packJobArgument(@Nullable T arg, @Nullable 
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
-        pack(arg, marshaller, packer);
+    public static <T> void packJobArgument(
+            @Nullable T arg,
+            @Nullable Marshaller<T, byte[]> marshaller,
+            ClientMessagePacker packer,
+            @Nullable Long observableTs) {
+        pack(arg, marshaller, packer, observableTs);
     }
 
     /**
@@ -49,7 +54,7 @@ public final class ClientComputeJobPacker {
      * @param <T> Result type.
      */
     public static <T> void packJobResult(@Nullable T res, @Nullable 
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
-        pack(res, marshaller, packer);
+        pack(res, marshaller, packer, null);
     }
 
     /**
@@ -61,8 +66,14 @@ public final class ClientComputeJobPacker {
      * @param arg Job argument.
      * @param platformComputeSupported Whether platform compute is supported.
      * @param w Packer.
+     * @param observableTs Observable timestamp. Not packed when null.
      */
-    public static <T, R> void packJob(JobDescriptor<T, R> descriptor, T arg, 
boolean platformComputeSupported, ClientMessagePacker w) {
+    public static <T, R> void packJob(
+            JobDescriptor<T, R> descriptor,
+            T arg,
+            boolean platformComputeSupported,
+            ClientMessagePacker w,
+            @Nullable Long observableTs) {
         w.packDeploymentUnits(descriptor.units());
 
         w.packString(descriptor.jobClassName());
@@ -77,15 +88,23 @@ public final class ClientComputeJobPacker {
             throw new IllegalArgumentException("Custom job executors are not 
supported by the server: " + executorType);
         }
 
-        packJobArgument(arg, descriptor.argumentMarshaller(), w);
+        packJobArgument(arg, descriptor.argumentMarshaller(), w, observableTs);
     }
 
     /** Packs object in the format: | typeId | value |. */
-    private static <T> void pack(@Nullable T obj, @Nullable Marshaller<T, 
byte[]> marshaller, ClientMessagePacker packer) {
+    private static <T> void pack(
+            @Nullable T obj,
+            @Nullable Marshaller<T, byte[]> marshaller,
+            ClientMessagePacker packer,
+            @Nullable Long observableTs) {
         ComputeJobDataHolder holder = obj instanceof ComputeJobDataHolder
                 ? (ComputeJobDataHolder) obj
                 : SharedComputeUtils.marshalArgOrResult(obj, marshaller);
 
+        if (observableTs != null) {
+            packer.packLong(observableTs);
+        }
+
         if (holder.data() == null) {
             packer.packNil();
             return;
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
index 05f4e293422..c049baf0c0a 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
@@ -25,6 +25,7 @@ import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.ComputeJobDataType;
 import org.apache.ignite.internal.compute.SharedComputeUtils;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.marshalling.UnmarshallingException;
 import org.jetbrains.annotations.Nullable;
@@ -46,15 +47,23 @@ public final class ClientComputeJobUnpacker {
             @Nullable Marshaller<?, byte[]> marshaller,
             @Nullable Class<?> resultClass
     ) {
-        ComputeJobDataHolder holder = 
unpackJobArgumentWithoutMarshaller(unpacker);
+        ComputeJobDataHolder holder = 
unpackJobArgumentWithoutMarshaller(unpacker, false);
 
         return SharedComputeUtils.unmarshalArgOrResult(holder, marshaller, 
resultClass);
     }
 
     /** Unpacks compute job argument without marshaller. */
-    public static @Nullable ComputeJobDataHolder 
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {
+    public static @Nullable ComputeJobDataHolder 
unpackJobArgumentWithoutMarshaller(
+            ClientMessageUnpacker unpacker,
+            boolean enableObservableTs) {
+        long observableTs = enableObservableTs
+                ? unpacker.unpackLong()
+                : HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+
         if (unpacker.tryUnpackNil()) {
-            return null;
+            return observableTs == HybridTimestamp.NULL_HYBRID_TIMESTAMP
+                    ? null
+                    : new ComputeJobDataHolder(ComputeJobDataType.NATIVE, 
null, observableTs);
         }
 
         int typeId = unpacker.unpackInt();
@@ -63,11 +72,14 @@ public final class ClientComputeJobUnpacker {
             throw new UnmarshallingException("Unsupported compute job type id: 
" + typeId);
         }
 
-        return new ComputeJobDataHolder(type, unpacker.readBinary());
+        return new ComputeJobDataHolder(type, unpacker.readBinary(), 
observableTs);
     }
 
     /** Unpacks compute job info. */
-    public static Job unpackJob(ClientMessageUnpacker unpacker, boolean 
enablePlatformJobs) {
+    public static Job unpackJob(
+            ClientMessageUnpacker unpacker,
+            boolean enablePlatformJobs,
+            boolean enableObservableTs) {
         List<DeploymentUnit> deploymentUnits = 
unpacker.unpackDeploymentUnits();
         String jobClassName = unpacker.unpackString();
         var options = 
JobExecutionOptions.builder().priority(unpacker.unpackInt()).maxRetries(unpacker.unpackInt());
@@ -76,7 +88,7 @@ public final class ClientComputeJobUnpacker {
             
options.executorType(JobExecutorType.fromOrdinal(unpacker.unpackInt()));
         }
 
-        ComputeJobDataHolder args = 
unpackJobArgumentWithoutMarshaller(unpacker);
+        ComputeJobDataHolder args = 
unpackJobArgumentWithoutMarshaller(unpacker, enableObservableTs);
 
         return new Job(deploymentUnits, jobClassName, options.build(), args);
     }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 858ccccbc20..8a4caa9a0ef 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -92,7 +92,12 @@ public enum ProtocolBitmaskFeature {
     /**
      * Thin SQL client supports iteration over the results of script execution.
      */
-    SQL_MULTISTATEMENT_SUPPORT(13);
+    SQL_MULTISTATEMENT_SUPPORT(13),
+
+    /**
+     * Compute tasks and jobs accept observable timestamp from the client.
+     */
+    COMPUTE_OBSERVABLE_TS(14);
 
     private static final EnumSet<ProtocolBitmaskFeature> 
ALL_FEATURES_AS_ENUM_SET =
             EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
index 1c570970c36..fcfa0e72cfb 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
@@ -33,16 +33,6 @@ public class ComputeJobDataHolder {
 
     private final @Nullable Long observableTimestamp;
 
-    /**
-     * Constructs a holder.
-     *
-     * @param type Job argument type.
-     * @param data Marshalled data.
-     */
-    public ComputeJobDataHolder(ComputeJobDataType type, byte @Nullable [] 
data) {
-        this(type, data, null);
-    }
-
     /**
      * Constructs a holder.
      *
@@ -69,7 +59,7 @@ public class ComputeJobDataHolder {
      *
      * @return Marshalled data.
      */
-    public byte[] data() {
+    public byte @Nullable [] data() {
         return data;
     }
 
diff --git 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
index aabf9ff3034..893608f8602 100644
--- 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
+++ 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
@@ -145,12 +145,12 @@ class ClientComputeJobPackerUnpackerTest {
     @ParameterizedTest
     void notMarshalledArgument(Object arg, ComputeJobDataType type) {
         // When pack job argument without marshaller.
-        packJobArgument(arg, null, messagePacker);
+        packJobArgument(arg, null, messagePacker, null);
         byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
 
         // And unpack without marshaller.
         try (var messageUnpacker = messageUnpacker(data)) {
-            var res = unpackJobArgumentWithoutMarshaller(messageUnpacker);
+            var res = unpackJobArgumentWithoutMarshaller(messageUnpacker, 
false);
 
             // Then argument is unpacked but not unmarshalled.
             ComputeJobDataHolder argument = 
assertInstanceOf(ComputeJobDataHolder.class, res);
diff --git 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 91c19c741e6..b8ad0f0e118 100644
--- 
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ 
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -559,6 +559,7 @@ public class ItClientHandlerTest extends 
BaseIgniteAbstractTest {
             expected.set(11);
             expected.set(12);
             expected.set(13);
+            expected.set(14);
             assertEquals(expected, supportedFeatures);
 
             var extensionsLen = unpacker.unpackInt();
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 4de86ddb0e6..9389c76898a 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -97,7 +97,8 @@ public class ClientHandlerModule implements IgniteComponent, 
PlatformComputeTran
             ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS,
             ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
             ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
-            ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT
+            ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
+            ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
     ));
 
     /** Connection id generator.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 129bbe60ffe..a15077c7573 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -1414,11 +1414,11 @@ public class ClientInboundMessageHandler
                         packer.packString(jobClassName);
                         packDeploymentUnitPaths(ctx.deploymentUnits(), packer);
                         packer.packBoolean(false); // Retain deployment units 
in cache.
-                        ClientComputeJobPacker.packJobArgument(arg, null, 
packer);
+                        ClientComputeJobPacker.packJobArgument(arg, null, 
packer, null);
                     })
                     .thenApply(unpacker -> {
                         try (unpacker) {
-                            return 
ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller(unpacker);
+                            return 
ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller(unpacker, false);
                         }
                     });
         }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index f2bb27b1e57..14ba2b050fd 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
 import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
 import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
 
@@ -72,7 +73,9 @@ public class ClientComputeExecuteColocatedRequest {
         BitSet noValueSet = in.unpackBitSet();
         byte[] tupleBytes = in.readBinary();
 
-        Job job = unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+        boolean enablePlatformJobs = 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB);
+        boolean enableObservableTs = 
clientContext.hasFeature(COMPUTE_OBSERVABLE_TS);
+        Job job = unpackJob(in, enablePlatformJobs, enableObservableTs);
         unpackTaskId(in, clientContext.hasFeature(COMPUTE_TASK_ID)); // 
Placeholder for a possible future usage
 
         return readTableAsync(tableId, tables).thenCompose(table -> 
readTuple(schemaId, noValueSet, tupleBytes, table, true)
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index ccecbf120bc..08a4bac84d9 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.HybridTimestampProvider;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
@@ -70,7 +71,9 @@ public class ClientComputeExecuteMapReduceRequest {
     ) {
         List<DeploymentUnit> deploymentUnits = in.unpackDeploymentUnits();
         String taskClassName = in.unpackString();
-        ComputeJobDataHolder arg = unpackJobArgumentWithoutMarshaller(in);
+
+        boolean enableObservableTs = 
clientContext.hasFeature(ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS);
+        ComputeJobDataHolder arg = unpackJobArgumentWithoutMarshaller(in, 
enableObservableTs);
 
         TaskDescriptor<Object, Object> taskDescriptor = 
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build();
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
index 358d721bae6..e5ee64bf916 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
@@ -22,6 +22,7 @@ import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExe
 import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
 import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
 import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
 
@@ -67,7 +68,9 @@ public class ClientComputeExecutePartitionedRequest {
         int tableId = in.unpackInt();
         int partitionId = in.unpackInt();
 
-        Job job = unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+        boolean enablePlatformJobs = 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB);
+        boolean enableObservableTs = 
clientContext.hasFeature(COMPUTE_OBSERVABLE_TS);
+        Job job = unpackJob(in, enablePlatformJobs, enableObservableTs);
         UUID taskId = unpackTaskId(in, 
clientContext.hasFeature(COMPUTE_TASK_ID));
 
         return readTableAsync(tableId, tables).thenCompose(table -> {
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 15a58a1e667..ed925e27a24 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.client.handler.requests.cluster.ClientClusterGet
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStateRequest.packJobState;
 import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
 import static 
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
+import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
 import static 
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
@@ -79,7 +80,9 @@ public class ClientComputeExecuteRequest {
     ) {
         Set<InternalClusterNode> candidates = unpackCandidateNodes(in, 
cluster);
 
-        Job job = unpackJob(in, 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+        boolean enablePlatformJobs = 
clientContext.hasFeature(PLATFORM_COMPUTE_JOB);
+        boolean enableObservableTs = 
clientContext.hasFeature(COMPUTE_OBSERVABLE_TS);
+        Job job = unpackJob(in, enablePlatformJobs, enableObservableTs);
         UUID taskId = unpackTaskId(in, 
clientContext.hasFeature(COMPUTE_TASK_ID));
 
         ComputeEventMetadataBuilder metadataBuilder = 
ComputeEventMetadata.builder(taskId != null ? Type.BROADCAST : Type.SINGLE)
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 30a6968d9ae..b8ce92e43c5 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -98,7 +98,8 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
             ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS,
             ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
             ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
-            ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT
+            ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
+            ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
     ));
 
     /** Minimum supported heartbeat interval. */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index bd512b53d98..fa258e857cf 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -317,7 +317,7 @@ public class ClientCompute implements IgniteCompute {
     private <T, R> CompletableFuture<SubmitTaskResult> 
doExecuteMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
         return ch.serviceAsync(
                 ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
-                w -> packTask(w.out(), taskDescriptor, arg),
+                w -> packTask(w, taskDescriptor, arg),
                 ClientCompute::unpackSubmitTaskResult,
                 (String) null,
                 null,
@@ -362,7 +362,7 @@ public class ClientCompute implements IgniteCompute {
         return iterator.next();
     }
 
-    private static <K, T, R> CompletableFuture<SubmitResult> 
executeColocatedObjectKey(
+    private <K, T, R> CompletableFuture<SubmitResult> 
executeColocatedObjectKey(
             ClientTable t,
             K key,
             Mapper<K> keyMapper,
@@ -379,7 +379,7 @@ public class ClientCompute implements IgniteCompute {
         );
     }
 
-    private static <T, R> CompletableFuture<SubmitResult> 
executeColocatedTupleKey(
+    private <T, R> CompletableFuture<SubmitResult> executeColocatedTupleKey(
             ClientTable t,
             Tuple key,
             JobDescriptor<T, R> descriptor,
@@ -395,7 +395,7 @@ public class ClientCompute implements IgniteCompute {
         );
     }
 
-    private static <T, R> CompletableFuture<SubmitResult> 
executeColocatedInternal(
+    private <T, R> CompletableFuture<SubmitResult> executeColocatedInternal(
             ClientTable t,
             BiConsumer<PayloadOutputChannel, ClientSchema> keyWriter,
             PartitionAwarenessProvider partitionAwarenessProvider,
@@ -438,7 +438,7 @@ public class ClientCompute implements IgniteCompute {
                 .thenCompose(Function.identity()));
     }
 
-    private static <T, R> CompletableFuture<SubmitResult> executePartitioned(
+    private <T, R> CompletableFuture<SubmitResult> executePartitioned(
             ClientTable t,
             Partition partition,
             JobDescriptor<T, R> descriptor,
@@ -520,11 +520,11 @@ public class ClientCompute implements IgniteCompute {
         }
     }
 
-    private static <T, R> void packJob(PayloadOutputChannel out, 
JobDescriptor<T, R> descriptor, T arg) {
+    private <T, R> void packJob(PayloadOutputChannel out, JobDescriptor<T, R> 
descriptor, T arg) {
         boolean platformComputeSupported = 
out.clientChannel().protocolContext()
                 
.isFeatureSupported(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB);
 
-        ClientComputeJobPacker.packJob(descriptor, arg, 
platformComputeSupported, out.out());
+        ClientComputeJobPacker.packJob(descriptor, arg, 
platformComputeSupported, out.out(), getJobObservableTs(out));
     }
 
     private static void packTaskId(PayloadOutputChannel out, @Nullable UUID 
taskId) {
@@ -533,10 +533,18 @@ public class ClientCompute implements IgniteCompute {
         }
     }
 
-    private static <T, R> void packTask(ClientMessagePacker w, 
TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
-        w.packDeploymentUnits(taskDescriptor.units());
-        w.packString(taskDescriptor.taskClassName());
-        ClientComputeJobPacker.packJobArgument(arg, 
taskDescriptor.splitJobArgumentMarshaller(), w);
+    private <T, R> void packTask(PayloadOutputChannel w, TaskDescriptor<T, R> 
taskDescriptor, @Nullable T arg) {
+        w.out().packDeploymentUnits(taskDescriptor.units());
+        w.out().packString(taskDescriptor.taskClassName());
+        ClientComputeJobPacker.packJobArgument(arg, 
taskDescriptor.splitJobArgumentMarshaller(), w.out(), getJobObservableTs(w));
+    }
+
+    private @Nullable Long getJobObservableTs(PayloadOutputChannel w) {
+        if 
(!w.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS))
 {
+            return null;
+        }
+
+        return ch.observableTimestamp().getLong();
     }
 
     /**
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 8538d0ec88a..4bbb4c75626 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.compute.JobStatus.CANCELED;
@@ -26,6 +27,7 @@ import static org.apache.ignite.compute.JobStatus.FAILED;
 import static org.apache.ignite.compute.JobStatus.QUEUED;
 import static org.apache.ignite.internal.IgniteExceptionTestUtils.hasMessage;
 import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.traceableException;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -52,6 +54,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.ArrayList;
@@ -70,9 +73,11 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.BroadcastExecution;
 import org.apache.ignite.compute.BroadcastJobTarget;
 import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.compute.TaskDescriptor;
@@ -80,6 +85,7 @@ import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.ConfigOverride;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.lang.CancellationToken;
@@ -898,6 +904,30 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
+    @Test
+    public void observableTsIsPropagatedToTargetNode() {
+        // Bump observable timestamp.
+        createTestTableWithOneRow();
+
+        HybridTimestamp localObservableTs = currentObservableTimestamp();
+        assertNotNull(localObservableTs);
+
+        JobExecution<Long> execution = submit(
+                JobTarget.node(clusterNode(node(1))),
+                
JobDescriptor.builder(ObservableTimestampJob.class).units(units()).build(),
+                null
+        );
+
+        Long jobRes = execution.resultAsync().join();
+        HybridTimestamp jobObservableTs = 
HybridTimestamp.nullableHybridTimestamp(jobRes);
+
+        assertThat(jobObservableTs, is(localObservableTs));
+    }
+
+    protected @Nullable HybridTimestamp currentObservableTimestamp() {
+        return unwrapIgniteImpl(node(0)).observableTimeTracker().get();
+    }
+
     static Class<ToStringJob> toStringJobClass() {
         return ToStringJob.class;
     }
@@ -933,4 +963,11 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
                                 .or(instanceOf(CancellationException.class))
                 );
     }
+
+    private static class ObservableTimestampJob implements ComputeJob<Object, 
Long> {
+        @Override
+        public CompletableFuture<Long> executeAsync(JobExecutionContext 
context, Object arg) {
+            return 
completedFuture(unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong());
+        }
+    }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
index df96f7eb69a..1502718c99f 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.compute;
 
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.internal.compute.utils.Clients;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 
 /**
@@ -42,4 +45,10 @@ public class ItComputeTestClient extends 
ItComputeTestEmbedded {
     void cancelsNotCancellableJob(boolean local) {
         // No-op. Embedded-specific.
     }
+
+    @Override
+    protected @Nullable HybridTimestamp currentObservableTimestamp() {
+        TcpIgniteClient client = (TcpIgniteClient) clients.client(node(0));
+        return client.channel().observableTimestamp().get();
+    }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
index dd889a5e402..b829d945a4c 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
@@ -31,6 +31,17 @@ import org.apache.ignite.compute.IgniteCompute;
 public class Clients {
     private final Map<String, IgniteClient> clients = new HashMap<>();
 
+    /**
+     * Gets thin client referenced by the embedded node.
+     *
+     * @param node Node to get the client for.
+     * @return Thin client.
+     */
+    public Ignite client(Ignite node) {
+        String address = "127.0.0.1:" + 
unwrapIgniteImpl(node).clientAddress().port();
+        return clients.computeIfAbsent(address, addr -> 
IgniteClient.builder().addresses(addr).build());
+    }
+
     /**
      * Gets compute API for the thin client referenced by the embedded node.
      *
@@ -38,10 +49,7 @@ public class Clients {
      * @return Compute API.
      */
     public IgniteCompute compute(Ignite node) {
-        String address = "127.0.0.1:" + 
unwrapIgniteImpl(node).clientAddress().port();
-        //noinspection resource
-        IgniteClient client = clients.computeIfAbsent(address, addr -> 
IgniteClient.builder().addresses(addr).build());
-        return client.compute();
+        return client(node).compute();
     }
 
     /**
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 1eea4f59e18..ab6e966b019 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -47,6 +47,7 @@ import 
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
 import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -89,6 +90,8 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
 
     private final EventLog eventLog;
 
+    private final HybridTimestampTracker observableTimestampTracker;
+
     private final ComputeMessaging messaging;
 
     private final ExecutionManager executionManager;
@@ -108,13 +111,15 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             UnitsContextManager jobContextManager,
             ComputeExecutor executor,
             ComputeConfiguration computeConfiguration,
-            EventLog eventLog
+            EventLog eventLog,
+            HybridTimestampTracker observableTimestampTracker
     ) {
         this.topologyService = topologyService;
         this.logicalTopologyService = logicalTopologyService;
         this.jobContextManager = jobContextManager;
         this.executor = executor;
         this.eventLog = eventLog;
+        this.observableTimestampTracker = observableTimestampTracker;
         executionManager = new ExecutionManager(computeConfiguration, 
topologyService);
         messaging = new ComputeMessaging(executionManager, messagingService, 
topologyService);
         failoverExecutor = Executors.newSingleThreadExecutor(
@@ -132,6 +137,8 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
         }
 
         try {
+            
observableTimestampTracker.update(executionContext.observableTimestamp());
+
             CompletableFuture<UnitsClassLoaderContext> classLoaderFut =
                     
jobContextManager.acquireClassLoader(executionContext.units(), 
executionContext.jobClassName());
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
index 5a97a13d26d..a65a610eb00 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
@@ -22,6 +22,7 @@ import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -115,4 +116,22 @@ public class ExecutionContext {
     public ComputeJobDataHolder arg() {
         return arg;
     }
+
+    /**
+     * Gets the observable timestamp from the job initiator client.
+     * This ensures that the job sees the changes made by the client up to the 
point of job submission.
+     *
+     * @return Observable timestamp or {@link 
HybridTimestamp#NULL_HYBRID_TIMESTAMP} if not set.
+     */
+    public long observableTimestamp() {
+        if (arg == null) {
+            return HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+        }
+
+        Long ts = arg.observableTimestamp();
+
+        return ts == null
+                ? HybridTimestamp.NULL_HYBRID_TIMESTAMP
+                : ts;
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index c95f63a986b..8c48e308eac 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -157,7 +157,9 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
         Objects.requireNonNull(target);
         Objects.requireNonNull(descriptor);
 
-        ComputeJobDataHolder argHolder = 
SharedComputeUtils.marshalArgOrResult(arg, descriptor.argumentMarshaller());
+        ComputeJobDataHolder argHolder = SharedComputeUtils.marshalArgOrResult(
+                arg, descriptor.argumentMarshaller(), 
observableTimestampTracker.getLong());
+
         ExecutionContext executionContext = new ExecutionContext(descriptor, 
metadataBuilder, argHolder);
 
         if (target instanceof AnyNodeJobTarget) {
@@ -685,7 +687,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                 deploymentUnits,
                 getReceiverJobClassName(options.executorType()),
                 ComputeEventMetadata.builder(Type.DATA_RECEIVER),
-                SharedComputeUtils.marshalArgOrResult(payload, null)
+                SharedComputeUtils.marshalArgOrResult(payload, null, 
observableTimestampTracker.getLong())
         );
 
         // Use Compute to execute receiver on the target node with failover, 
class loading, scheduling.
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 19b74080f48..5a4a062b7a6 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -94,6 +94,7 @@ import 
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
 import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -181,7 +182,8 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
                 unitsContextManager,
                 computeExecutor,
                 computeConfiguration,
-                EventLog.NOOP
+                EventLog.NOOP,
+                HybridTimestampTracker.emptyTracker()
         );
 
         assertThat(computeComponent.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 01882dc23d3..30e3f827b5d 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1260,7 +1260,8 @@ public class IgniteImpl implements Ignite {
                 ),
                 computeExecutor,
                 computeCfg,
-                eventLog
+                eventLog,
+                observableTimestampTracker
         );
 
         systemViewManager.register(computeComponent);


Reply via email to