This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 14429f22394 IGNITE-25718 Propagate observableTs to compute jobs (#7411)
14429f22394 is described below
commit 14429f22394186912e3ae1d811143d78c6e03336
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Jan 16 14:41:48 2026 +0200
IGNITE-25718 Propagate observableTs to compute jobs (#7411)
Send observable timestamp with the compute job to ensure that the job sees
all changes made by the client.
---
.../client/proto/ClientComputeJobPacker.java | 31 ++++++++++++++----
.../client/proto/ClientComputeJobUnpacker.java | 24 ++++++++++----
.../client/proto/ProtocolBitmaskFeature.java | 7 +++-
.../internal/compute/ComputeJobDataHolder.java | 12 +------
.../proto/ClientComputeJobPackerUnpackerTest.java | 4 +--
.../ignite/client/handler/ItClientHandlerTest.java | 1 +
.../ignite/client/handler/ClientHandlerModule.java | 3 +-
.../handler/ClientInboundMessageHandler.java | 4 +--
.../ClientComputeExecuteColocatedRequest.java | 5 ++-
.../ClientComputeExecuteMapReduceRequest.java | 5 ++-
.../ClientComputeExecutePartitionedRequest.java | 5 ++-
.../compute/ClientComputeExecuteRequest.java | 5 ++-
.../ignite/internal/client/TcpClientChannel.java | 3 +-
.../internal/client/compute/ClientCompute.java | 30 +++++++++++-------
.../ignite/internal/compute/ItComputeBaseTest.java | 37 ++++++++++++++++++++++
.../internal/compute/ItComputeTestClient.java | 9 ++++++
.../ignite/internal/compute/utils/Clients.java | 16 +++++++---
.../internal/compute/ComputeComponentImpl.java | 9 +++++-
.../ignite/internal/compute/ExecutionContext.java | 19 +++++++++++
.../ignite/internal/compute/IgniteComputeImpl.java | 6 ++--
.../internal/compute/ComputeComponentImplTest.java | 4 ++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
22 files changed, 188 insertions(+), 54 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
index b7442ad323b..ddeb6f5d806 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobPacker.java
@@ -33,10 +33,15 @@ public final class ClientComputeJobPacker {
* @param arg Argument.
* @param marshaller Marshaller.
* @param packer Packer.
+ * @param observableTs Observable timestamp. Not packed when null.
* @param <T> Argument type.
*/
- public static <T> void packJobArgument(@Nullable T arg, @Nullable
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
- pack(arg, marshaller, packer);
+ public static <T> void packJobArgument(
+ @Nullable T arg,
+ @Nullable Marshaller<T, byte[]> marshaller,
+ ClientMessagePacker packer,
+ @Nullable Long observableTs) {
+ pack(arg, marshaller, packer, observableTs);
}
/**
@@ -49,7 +54,7 @@ public final class ClientComputeJobPacker {
* @param <T> Result type.
*/
public static <T> void packJobResult(@Nullable T res, @Nullable
Marshaller<T, byte[]> marshaller, ClientMessagePacker packer) {
- pack(res, marshaller, packer);
+ pack(res, marshaller, packer, null);
}
/**
@@ -61,8 +66,14 @@ public final class ClientComputeJobPacker {
* @param arg Job argument.
* @param platformComputeSupported Whether platform compute is supported.
* @param w Packer.
+ * @param observableTs Observable timestamp. Not packed when null.
*/
- public static <T, R> void packJob(JobDescriptor<T, R> descriptor, T arg,
boolean platformComputeSupported, ClientMessagePacker w) {
+ public static <T, R> void packJob(
+ JobDescriptor<T, R> descriptor,
+ T arg,
+ boolean platformComputeSupported,
+ ClientMessagePacker w,
+ @Nullable Long observableTs) {
w.packDeploymentUnits(descriptor.units());
w.packString(descriptor.jobClassName());
@@ -77,15 +88,23 @@ public final class ClientComputeJobPacker {
throw new IllegalArgumentException("Custom job executors are not
supported by the server: " + executorType);
}
- packJobArgument(arg, descriptor.argumentMarshaller(), w);
+ packJobArgument(arg, descriptor.argumentMarshaller(), w, observableTs);
}
/** Packs object in the format: | typeId | value |. */
- private static <T> void pack(@Nullable T obj, @Nullable Marshaller<T,
byte[]> marshaller, ClientMessagePacker packer) {
+ private static <T> void pack(
+ @Nullable T obj,
+ @Nullable Marshaller<T, byte[]> marshaller,
+ ClientMessagePacker packer,
+ @Nullable Long observableTs) {
ComputeJobDataHolder holder = obj instanceof ComputeJobDataHolder
? (ComputeJobDataHolder) obj
: SharedComputeUtils.marshalArgOrResult(obj, marshaller);
+ if (observableTs != null) {
+ packer.packLong(observableTs);
+ }
+
if (holder.data() == null) {
packer.packNil();
return;
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
index 05f4e293422..c049baf0c0a 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
@@ -25,6 +25,7 @@ import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.ComputeJobDataType;
import org.apache.ignite.internal.compute.SharedComputeUtils;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.marshalling.UnmarshallingException;
import org.jetbrains.annotations.Nullable;
@@ -46,15 +47,23 @@ public final class ClientComputeJobUnpacker {
@Nullable Marshaller<?, byte[]> marshaller,
@Nullable Class<?> resultClass
) {
- ComputeJobDataHolder holder =
unpackJobArgumentWithoutMarshaller(unpacker);
+ ComputeJobDataHolder holder =
unpackJobArgumentWithoutMarshaller(unpacker, false);
return SharedComputeUtils.unmarshalArgOrResult(holder, marshaller,
resultClass);
}
/** Unpacks compute job argument without marshaller. */
- public static @Nullable ComputeJobDataHolder
unpackJobArgumentWithoutMarshaller(ClientMessageUnpacker unpacker) {
+ public static @Nullable ComputeJobDataHolder
unpackJobArgumentWithoutMarshaller(
+ ClientMessageUnpacker unpacker,
+ boolean enableObservableTs) {
+ long observableTs = enableObservableTs
+ ? unpacker.unpackLong()
+ : HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+
if (unpacker.tryUnpackNil()) {
- return null;
+ return observableTs == HybridTimestamp.NULL_HYBRID_TIMESTAMP
+ ? null
+ : new ComputeJobDataHolder(ComputeJobDataType.NATIVE,
null, observableTs);
}
int typeId = unpacker.unpackInt();
@@ -63,11 +72,14 @@ public final class ClientComputeJobUnpacker {
throw new UnmarshallingException("Unsupported compute job type id:
" + typeId);
}
- return new ComputeJobDataHolder(type, unpacker.readBinary());
+ return new ComputeJobDataHolder(type, unpacker.readBinary(),
observableTs);
}
/** Unpacks compute job info. */
- public static Job unpackJob(ClientMessageUnpacker unpacker, boolean
enablePlatformJobs) {
+ public static Job unpackJob(
+ ClientMessageUnpacker unpacker,
+ boolean enablePlatformJobs,
+ boolean enableObservableTs) {
List<DeploymentUnit> deploymentUnits =
unpacker.unpackDeploymentUnits();
String jobClassName = unpacker.unpackString();
var options =
JobExecutionOptions.builder().priority(unpacker.unpackInt()).maxRetries(unpacker.unpackInt());
@@ -76,7 +88,7 @@ public final class ClientComputeJobUnpacker {
options.executorType(JobExecutorType.fromOrdinal(unpacker.unpackInt()));
}
- ComputeJobDataHolder args =
unpackJobArgumentWithoutMarshaller(unpacker);
+ ComputeJobDataHolder args =
unpackJobArgumentWithoutMarshaller(unpacker, enableObservableTs);
return new Job(deploymentUnits, jobClassName, options.build(), args);
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 858ccccbc20..8a4caa9a0ef 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -92,7 +92,12 @@ public enum ProtocolBitmaskFeature {
/**
* Thin SQL client supports iteration over the results of script execution.
*/
- SQL_MULTISTATEMENT_SUPPORT(13);
+ SQL_MULTISTATEMENT_SUPPORT(13),
+
+ /**
+ * Compute tasks and jobs accept observable timestamp from the client.
+ */
+ COMPUTE_OBSERVABLE_TS(14);
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
index 1c570970c36..fcfa0e72cfb 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/ComputeJobDataHolder.java
@@ -33,16 +33,6 @@ public class ComputeJobDataHolder {
private final @Nullable Long observableTimestamp;
- /**
- * Constructs a holder.
- *
- * @param type Job argument type.
- * @param data Marshalled data.
- */
- public ComputeJobDataHolder(ComputeJobDataType type, byte @Nullable []
data) {
- this(type, data, null);
- }
-
/**
* Constructs a holder.
*
@@ -69,7 +59,7 @@ public class ComputeJobDataHolder {
*
* @return Marshalled data.
*/
- public byte[] data() {
+ public byte @Nullable [] data() {
return data;
}
diff --git
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
index aabf9ff3034..893608f8602 100644
---
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
+++
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
@@ -145,12 +145,12 @@ class ClientComputeJobPackerUnpackerTest {
@ParameterizedTest
void notMarshalledArgument(Object arg, ComputeJobDataType type) {
// When pack job argument without marshaller.
- packJobArgument(arg, null, messagePacker);
+ packJobArgument(arg, null, messagePacker, null);
byte[] data = ByteBufUtil.getBytes(messagePacker.getBuffer());
// And unpack without marshaller.
try (var messageUnpacker = messageUnpacker(data)) {
- var res = unpackJobArgumentWithoutMarshaller(messageUnpacker);
+ var res = unpackJobArgumentWithoutMarshaller(messageUnpacker,
false);
// Then argument is unpacked but not unmarshalled.
ComputeJobDataHolder argument =
assertInstanceOf(ComputeJobDataHolder.class, res);
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 91c19c741e6..b8ad0f0e118 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -559,6 +559,7 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
expected.set(11);
expected.set(12);
expected.set(13);
+ expected.set(14);
assertEquals(expected, supportedFeatures);
var extensionsLen = unpacker.unpackInt();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 4de86ddb0e6..9389c76898a 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -97,7 +97,8 @@ public class ClientHandlerModule implements IgniteComponent,
PlatformComputeTran
ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS,
ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
- ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT
+ ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
+ ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
));
/** Connection id generator.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 129bbe60ffe..a15077c7573 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -1414,11 +1414,11 @@ public class ClientInboundMessageHandler
packer.packString(jobClassName);
packDeploymentUnitPaths(ctx.deploymentUnits(), packer);
packer.packBoolean(false); // Retain deployment units
in cache.
- ClientComputeJobPacker.packJobArgument(arg, null,
packer);
+ ClientComputeJobPacker.packJobArgument(arg, null,
packer, null);
})
.thenApply(unpacker -> {
try (unpacker) {
- return
ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller(unpacker);
+ return
ClientComputeJobUnpacker.unpackJobArgumentWithoutMarshaller(unpacker, false);
}
});
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
index f2bb27b1e57..14ba2b050fd 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteColocatedRequest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
@@ -72,7 +73,9 @@ public class ClientComputeExecuteColocatedRequest {
BitSet noValueSet = in.unpackBitSet();
byte[] tupleBytes = in.readBinary();
- Job job = unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+ boolean enablePlatformJobs =
clientContext.hasFeature(PLATFORM_COMPUTE_JOB);
+ boolean enableObservableTs =
clientContext.hasFeature(COMPUTE_OBSERVABLE_TS);
+ Job job = unpackJob(in, enablePlatformJobs, enableObservableTs);
unpackTaskId(in, clientContext.hasFeature(COMPUTE_TASK_ID)); //
Placeholder for a possible future usage
return readTableAsync(tableId, tables).thenCompose(table ->
readTuple(schemaId, noValueSet, tupleBytes, table, true)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index ccecbf120bc..08a4bac84d9 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.client.proto.ClientComputeJobPacker;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.HybridTimestampProvider;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
@@ -70,7 +71,9 @@ public class ClientComputeExecuteMapReduceRequest {
) {
List<DeploymentUnit> deploymentUnits = in.unpackDeploymentUnits();
String taskClassName = in.unpackString();
- ComputeJobDataHolder arg = unpackJobArgumentWithoutMarshaller(in);
+
+ boolean enableObservableTs =
clientContext.hasFeature(ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS);
+ ComputeJobDataHolder arg = unpackJobArgumentWithoutMarshaller(in,
enableObservableTs);
TaskDescriptor<Object, Object> taskDescriptor =
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
index 358d721bae6..e5ee64bf916 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecutePartitionedRequest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.client.handler.requests.compute.ClientComputeExe
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
@@ -67,7 +68,9 @@ public class ClientComputeExecutePartitionedRequest {
int tableId = in.unpackInt();
int partitionId = in.unpackInt();
- Job job = unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+ boolean enablePlatformJobs =
clientContext.hasFeature(PLATFORM_COMPUTE_JOB);
+ boolean enableObservableTs =
clientContext.hasFeature(COMPUTE_OBSERVABLE_TS);
+ Job job = unpackJob(in, enablePlatformJobs, enableObservableTs);
UUID taskId = unpackTaskId(in,
clientContext.hasFeature(COMPUTE_TASK_ID));
return readTableAsync(tableId, tables).thenCompose(table -> {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 15a58a1e667..ed925e27a24 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.client.handler.requests.cluster.ClientClusterGet
import static
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStateRequest.packJobState;
import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackJob;
import static
org.apache.ignite.internal.client.proto.ClientComputeJobUnpacker.unpackTaskId;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.COMPUTE_TASK_ID;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
@@ -79,7 +80,9 @@ public class ClientComputeExecuteRequest {
) {
Set<InternalClusterNode> candidates = unpackCandidateNodes(in,
cluster);
- Job job = unpackJob(in,
clientContext.hasFeature(PLATFORM_COMPUTE_JOB));
+ boolean enablePlatformJobs =
clientContext.hasFeature(PLATFORM_COMPUTE_JOB);
+ boolean enableObservableTs =
clientContext.hasFeature(COMPUTE_OBSERVABLE_TS);
+ Job job = unpackJob(in, enablePlatformJobs, enableObservableTs);
UUID taskId = unpackTaskId(in,
clientContext.hasFeature(COMPUTE_TASK_ID));
ComputeEventMetadataBuilder metadataBuilder =
ComputeEventMetadata.builder(taskId != null ? Type.BROADCAST : Type.SINGLE)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 30a6968d9ae..b8ce92e43c5 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -98,7 +98,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS,
ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
- ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT
+ ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
+ ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
));
/** Minimum supported heartbeat interval. */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index bd512b53d98..fa258e857cf 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -317,7 +317,7 @@ public class ClientCompute implements IgniteCompute {
private <T, R> CompletableFuture<SubmitTaskResult>
doExecuteMapReduceAsync(TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
return ch.serviceAsync(
ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
- w -> packTask(w.out(), taskDescriptor, arg),
+ w -> packTask(w, taskDescriptor, arg),
ClientCompute::unpackSubmitTaskResult,
(String) null,
null,
@@ -362,7 +362,7 @@ public class ClientCompute implements IgniteCompute {
return iterator.next();
}
- private static <K, T, R> CompletableFuture<SubmitResult>
executeColocatedObjectKey(
+ private <K, T, R> CompletableFuture<SubmitResult>
executeColocatedObjectKey(
ClientTable t,
K key,
Mapper<K> keyMapper,
@@ -379,7 +379,7 @@ public class ClientCompute implements IgniteCompute {
);
}
- private static <T, R> CompletableFuture<SubmitResult>
executeColocatedTupleKey(
+ private <T, R> CompletableFuture<SubmitResult> executeColocatedTupleKey(
ClientTable t,
Tuple key,
JobDescriptor<T, R> descriptor,
@@ -395,7 +395,7 @@ public class ClientCompute implements IgniteCompute {
);
}
- private static <T, R> CompletableFuture<SubmitResult>
executeColocatedInternal(
+ private <T, R> CompletableFuture<SubmitResult> executeColocatedInternal(
ClientTable t,
BiConsumer<PayloadOutputChannel, ClientSchema> keyWriter,
PartitionAwarenessProvider partitionAwarenessProvider,
@@ -438,7 +438,7 @@ public class ClientCompute implements IgniteCompute {
.thenCompose(Function.identity()));
}
- private static <T, R> CompletableFuture<SubmitResult> executePartitioned(
+ private <T, R> CompletableFuture<SubmitResult> executePartitioned(
ClientTable t,
Partition partition,
JobDescriptor<T, R> descriptor,
@@ -520,11 +520,11 @@ public class ClientCompute implements IgniteCompute {
}
}
- private static <T, R> void packJob(PayloadOutputChannel out,
JobDescriptor<T, R> descriptor, T arg) {
+ private <T, R> void packJob(PayloadOutputChannel out, JobDescriptor<T, R>
descriptor, T arg) {
boolean platformComputeSupported =
out.clientChannel().protocolContext()
.isFeatureSupported(ProtocolBitmaskFeature.PLATFORM_COMPUTE_JOB);
- ClientComputeJobPacker.packJob(descriptor, arg,
platformComputeSupported, out.out());
+ ClientComputeJobPacker.packJob(descriptor, arg,
platformComputeSupported, out.out(), getJobObservableTs(out));
}
private static void packTaskId(PayloadOutputChannel out, @Nullable UUID
taskId) {
@@ -533,10 +533,18 @@ public class ClientCompute implements IgniteCompute {
}
}
- private static <T, R> void packTask(ClientMessagePacker w,
TaskDescriptor<T, R> taskDescriptor, @Nullable T arg) {
- w.packDeploymentUnits(taskDescriptor.units());
- w.packString(taskDescriptor.taskClassName());
- ClientComputeJobPacker.packJobArgument(arg,
taskDescriptor.splitJobArgumentMarshaller(), w);
+ private <T, R> void packTask(PayloadOutputChannel w, TaskDescriptor<T, R>
taskDescriptor, @Nullable T arg) {
+ w.out().packDeploymentUnits(taskDescriptor.units());
+ w.out().packString(taskDescriptor.taskClassName());
+ ClientComputeJobPacker.packJobArgument(arg,
taskDescriptor.splitJobArgumentMarshaller(), w.out(), getJobObservableTs(w));
+ }
+
+ private @Nullable Long getJobObservableTs(PayloadOutputChannel w) {
+ if
(!w.clientChannel().protocolContext().isFeatureSupported(ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS))
{
+ return null;
+ }
+
+ return ch.observableTimestamp().getLong();
}
/**
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 8538d0ec88a..4bbb4c75626 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.compute;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.compute.JobStatus.CANCELED;
@@ -26,6 +27,7 @@ import static org.apache.ignite.compute.JobStatus.FAILED;
import static org.apache.ignite.compute.JobStatus.QUEUED;
import static org.apache.ignite.internal.IgniteExceptionTestUtils.hasMessage;
import static
org.apache.ignite.internal.IgniteExceptionTestUtils.traceableException;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
@@ -52,6 +54,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
@@ -70,9 +73,11 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
import org.apache.ignite.compute.ComputeException;
+import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.compute.TaskDescriptor;
@@ -80,6 +85,7 @@ import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.ConfigOverride;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
@@ -898,6 +904,30 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
}
}
+ @Test
+ public void observableTsIsPropagatedToTargetNode() {
+ // Bump observable timestamp.
+ createTestTableWithOneRow();
+
+ HybridTimestamp localObservableTs = currentObservableTimestamp();
+ assertNotNull(localObservableTs);
+
+ JobExecution<Long> execution = submit(
+ JobTarget.node(clusterNode(node(1))),
+
JobDescriptor.builder(ObservableTimestampJob.class).units(units()).build(),
+ null
+ );
+
+ Long jobRes = execution.resultAsync().join();
+ HybridTimestamp jobObservableTs =
HybridTimestamp.nullableHybridTimestamp(jobRes);
+
+ assertThat(jobObservableTs, is(localObservableTs));
+ }
+
+ protected @Nullable HybridTimestamp currentObservableTimestamp() {
+ return unwrapIgniteImpl(node(0)).observableTimeTracker().get();
+ }
+
static Class<ToStringJob> toStringJobClass() {
return ToStringJob.class;
}
@@ -933,4 +963,11 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
.or(instanceOf(CancellationException.class))
);
}
+
+ private static class ObservableTimestampJob implements ComputeJob<Object,
Long> {
+ @Override
+ public CompletableFuture<Long> executeAsync(JobExecutionContext
context, Object arg) {
+ return
completedFuture(unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong());
+ }
+ }
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
index df96f7eb69a..1502718c99f 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestClient.java
@@ -18,7 +18,10 @@
package org.apache.ignite.internal.compute;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.compute.utils.Clients;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
/**
@@ -42,4 +45,10 @@ public class ItComputeTestClient extends
ItComputeTestEmbedded {
void cancelsNotCancellableJob(boolean local) {
// No-op. Embedded-specific.
}
+
+ @Override
+ protected @Nullable HybridTimestamp currentObservableTimestamp() {
+ TcpIgniteClient client = (TcpIgniteClient) clients.client(node(0));
+ return client.channel().observableTimestamp().get();
+ }
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
index dd889a5e402..b829d945a4c 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/Clients.java
@@ -31,6 +31,17 @@ import org.apache.ignite.compute.IgniteCompute;
public class Clients {
private final Map<String, IgniteClient> clients = new HashMap<>();
+ /**
+ * Gets thin client referenced by the embedded node.
+ *
+ * @param node Node to get the client for.
+ * @return Thin client.
+ */
+ public Ignite client(Ignite node) {
+ String address = "127.0.0.1:" +
unwrapIgniteImpl(node).clientAddress().port();
+ return clients.computeIfAbsent(address, addr ->
IgniteClient.builder().addresses(addr).build());
+ }
+
/**
* Gets compute API for the thin client referenced by the embedded node.
*
@@ -38,10 +49,7 @@ public class Clients {
* @return Compute API.
*/
public IgniteCompute compute(Ignite node) {
- String address = "127.0.0.1:" +
unwrapIgniteImpl(node).clientAddress().port();
- //noinspection resource
- IgniteClient client = clients.computeIfAbsent(address, addr ->
IgniteClient.builder().addresses(addr).build());
- return client.compute();
+ return client(node).compute();
}
/**
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 1eea4f59e18..ab6e966b019 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.future.InFlightFutures;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -89,6 +90,8 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
private final EventLog eventLog;
+ private final HybridTimestampTracker observableTimestampTracker;
+
private final ComputeMessaging messaging;
private final ExecutionManager executionManager;
@@ -108,13 +111,15 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
UnitsContextManager jobContextManager,
ComputeExecutor executor,
ComputeConfiguration computeConfiguration,
- EventLog eventLog
+ EventLog eventLog,
+ HybridTimestampTracker observableTimestampTracker
) {
this.topologyService = topologyService;
this.logicalTopologyService = logicalTopologyService;
this.jobContextManager = jobContextManager;
this.executor = executor;
this.eventLog = eventLog;
+ this.observableTimestampTracker = observableTimestampTracker;
executionManager = new ExecutionManager(computeConfiguration,
topologyService);
messaging = new ComputeMessaging(executionManager, messagingService,
topologyService);
failoverExecutor = Executors.newSingleThreadExecutor(
@@ -132,6 +137,8 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
}
try {
+
observableTimestampTracker.update(executionContext.observableTimestamp());
+
CompletableFuture<UnitsClassLoaderContext> classLoaderFut =
jobContextManager.acquireClassLoader(executionContext.units(),
executionContext.jobClassName());
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
index 5a97a13d26d..a65a610eb00 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
@@ -22,6 +22,7 @@ import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.jetbrains.annotations.Nullable;
/**
@@ -115,4 +116,22 @@ public class ExecutionContext {
public ComputeJobDataHolder arg() {
return arg;
}
+
+ /**
+ * Gets the observable timestamp from the job initiator client.
+ * This ensures that the job sees the changes made by the client up to the
point of job submission.
+ *
+ * @return Observable timestamp or {@link
HybridTimestamp#NULL_HYBRID_TIMESTAMP} if not set.
+ */
+ public long observableTimestamp() {
+ if (arg == null) {
+ return HybridTimestamp.NULL_HYBRID_TIMESTAMP;
+ }
+
+ Long ts = arg.observableTimestamp();
+
+ return ts == null
+ ? HybridTimestamp.NULL_HYBRID_TIMESTAMP
+ : ts;
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index c95f63a986b..8c48e308eac 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -157,7 +157,9 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
Objects.requireNonNull(target);
Objects.requireNonNull(descriptor);
- ComputeJobDataHolder argHolder =
SharedComputeUtils.marshalArgOrResult(arg, descriptor.argumentMarshaller());
+ ComputeJobDataHolder argHolder = SharedComputeUtils.marshalArgOrResult(
+ arg, descriptor.argumentMarshaller(),
observableTimestampTracker.getLong());
+
ExecutionContext executionContext = new ExecutionContext(descriptor,
metadataBuilder, argHolder);
if (target instanceof AnyNodeJobTarget) {
@@ -685,7 +687,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
deploymentUnits,
getReceiverJobClassName(options.executorType()),
ComputeEventMetadata.builder(Type.DATA_RECEIVER),
- SharedComputeUtils.marshalArgOrResult(payload, null)
+ SharedComputeUtils.marshalArgOrResult(payload, null,
observableTimestampTracker.getLong())
);
// Use Compute to execute receiver on the target node with failover,
class loading, scheduling.
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 19b74080f48..5a4a062b7a6 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -94,6 +94,7 @@ import
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -181,7 +182,8 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
unitsContextManager,
computeExecutor,
computeConfiguration,
- EventLog.NOOP
+ EventLog.NOOP,
+ HybridTimestampTracker.emptyTracker()
);
assertThat(computeComponent.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 01882dc23d3..30e3f827b5d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1260,7 +1260,8 @@ public class IgniteImpl implements Ignite {
),
computeExecutor,
computeCfg,
- eventLog
+ eventLog,
+ observableTimestampTracker
);
systemViewManager.register(computeComponent);