This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 02f8f5d9a68 IGNITE-27594 Optimize observable timestamp handling in 
compute jobs (#7611)
02f8f5d9a68 is described below

commit 02f8f5d9a6895446574cff2c6c24b0066223a917
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Mar 24 06:00:29 2026 +0300

    IGNITE-27594 Optimize observable timestamp handling in compute jobs (#7611)
---
 modules/compute/build.gradle                       |   1 +
 modules/compute/jobs.gradle                        |   3 +
 .../ignite/internal/compute/ItComputeBaseTest.java |  32 +--
 .../jobs/embedded/ObservableTimestampJob.java      |  40 ++++
 .../jobs/embedded/ObservableTimestampResult.java   |  34 +++
 .../jobs/standalone/ObservableTimestampJob.java    |  40 ++++
 .../jobs/standalone/ObservableTimestampResult.java |  34 +++
 .../internal/compute/ComputeComponentImpl.java     |   9 +-
 .../internal/compute/ComputeIgniteFactory.java     |  40 ++++
 .../ignite/internal/compute/ExecutionContext.java  |   3 +-
 .../ignite/internal/compute/IgniteComputeImpl.java |   5 +-
 .../compute/executor/ComputeExecutorImpl.java      |  22 +-
 .../internal/compute/ComputeComponentImplTest.java |  13 +-
 .../compute/executor/ComputeExecutorTest.java      |   9 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |  11 +-
 .../ignite/internal/app/JobScopedIgnite.java       | 103 +++++++++
 .../ignite/internal/sql/api/IgniteSqlImpl.java     |  64 +++++-
 .../internal/sql/api/JobScopedIgniteSql.java       | 231 +++++++++++++++++++++
 .../internal/tx/impl/IgniteTransactionsImpl.java   |   5 +
 19 files changed, 658 insertions(+), 41 deletions(-)

diff --git a/modules/compute/build.gradle b/modules/compute/build.gradle
index 0be2d5189ff..f874b6f5c2b 100644
--- a/modules/compute/build.gradle
+++ b/modules/compute/build.gradle
@@ -66,6 +66,7 @@ dependencies {
     integrationTestImplementation project(':ignite-system-view-api')
     integrationTestImplementation project(':ignite-client-common')
     integrationTestImplementation project(':ignite-eventlog')
+    integrationTestImplementation project(':ignite-transactions')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
 }
diff --git a/modules/compute/jobs.gradle b/modules/compute/jobs.gradle
index b897fafdb22..50e758e70b8 100644
--- a/modules/compute/jobs.gradle
+++ b/modules/compute/jobs.gradle
@@ -50,6 +50,9 @@ processIntegrationTestResources {
 dependencies {
     jobsImplementation project(':ignite-api')
     jobsImplementation project(':ignite-core')
+    jobsImplementation project(':ignite-transactions')
+    jobsImplementation project(':ignite-runner')
+    jobsImplementation testFixtures(project(':ignite-runner'))
     unit1Implementation project(':ignite-api')
     unit2Implementation project(':ignite-api')
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 68aee720172..63147f204cf 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.compute;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.compute.JobStatus.CANCELED;
@@ -50,6 +49,7 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -73,11 +73,9 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.BroadcastExecution;
 import org.apache.ignite.compute.BroadcastJobTarget;
 import org.apache.ignite.compute.ComputeException;
-import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobTarget;
 import org.apache.ignite.compute.TaskDescriptor;
@@ -96,6 +94,7 @@ import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.table.partition.Partition;
+import org.example.jobs.embedded.ObservableTimestampResult;
 import org.hamcrest.Matcher;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
@@ -892,16 +891,28 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         HybridTimestamp localObservableTs = currentObservableTimestamp();
         assertNotNull(localObservableTs);
 
-        JobExecution<Long> execution = submit(
+        // Capture the target node's global tracker before the job runs.
+        HybridTimestamp targetNodeTsBefore = 
unwrapIgniteImpl(node(1)).observableTimeTracker().get();
+
+        JobExecution<ObservableTimestampResult> execution = submit(
                 JobTarget.node(clusterNode(node(1))),
-                
JobDescriptor.builder(ObservableTimestampJob.class).units(units()).build(),
+                JobDescriptor.<Void, 
ObservableTimestampResult>builder(jobClassName("ObservableTimestampJob"))
+                        .resultClass(ObservableTimestampResult.class)
+                        .units(units())
+                        .build(),
                 null
         );
 
-        Long jobRes = execution.resultAsync().join();
-        HybridTimestamp jobObservableTs = 
HybridTimestamp.nullableHybridTimestamp(jobRes);
+        ObservableTimestampResult jobRes = execution.resultAsync().join();
 
+        // The per-job tracker should have the client's observable timestamp.
+        HybridTimestamp jobObservableTs = 
HybridTimestamp.nullableHybridTimestamp(jobRes.perJobTimestamp);
         assertThat(jobObservableTs, is(localObservableTs));
+
+        // The node's global tracker should NOT be updated by the compute job.
+        HybridTimestamp targetNodeTsAfter = 
HybridTimestamp.nullableHybridTimestamp(jobRes.nodeGlobalTimestamp);
+        assertThat(targetNodeTsAfter, is(targetNodeTsBefore));
+        assertThat(targetNodeTsAfter, not(jobObservableTs));
     }
 
     protected @Nullable HybridTimestamp currentObservableTimestamp() {
@@ -980,11 +991,4 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
                                 .or(instanceOf(CancellationException.class))
                 );
     }
-
-    private static class ObservableTimestampJob implements ComputeJob<Object, 
Long> {
-        @Override
-        public CompletableFuture<Long> executeAsync(JobExecutionContext 
context, Object arg) {
-            return 
completedFuture(unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong());
-        }
-    }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampJob.java
 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampJob.java
new file mode 100644
index 00000000000..716c4b15868
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteTransactionsImpl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Compute job that returns both the per-job scoped observable timestamp and 
the node's global observable timestamp.
+ * Used to verify that compute jobs receive the client's observable timestamp 
without polluting the node's global tracker.
+ */
+public class ObservableTimestampJob implements ComputeJob<Void, 
ObservableTimestampResult> {
+    @Override
+    public CompletableFuture<ObservableTimestampResult> 
executeAsync(JobExecutionContext context, Void arg) {
+        long perJobTs = 
unwrapIgniteTransactionsImpl(context.ignite().transactions()).observableTimestampTracker().getLong();
+        long nodeGlobalTs = 
unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong();
+
+        return completedFuture(new ObservableTimestampResult(perJobTs, 
nodeGlobalTs));
+    }
+}
diff --git 
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampResult.java
 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampResult.java
new file mode 100644
index 00000000000..73e1df6299b
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/ObservableTimestampResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+/**
+ * Result of {@link ObservableTimestampJob} containing both per-job and node 
global observable timestamps.
+ */
+public class ObservableTimestampResult {
+    public long perJobTimestamp;
+    public long nodeGlobalTimestamp;
+
+    public ObservableTimestampResult() {
+    }
+
+    public ObservableTimestampResult(long perJobTimestamp, long 
nodeGlobalTimestamp) {
+        this.perJobTimestamp = perJobTimestamp;
+        this.nodeGlobalTimestamp = nodeGlobalTimestamp;
+    }
+}
diff --git 
a/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampJob.java
 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampJob.java
new file mode 100644
index 00000000000..508a01245e5
--- /dev/null
+++ 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.TestWrappers.unwrapIgniteTransactionsImpl;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+
+/**
+ * Compute job that returns both the per-job scoped observable timestamp and 
the node's global observable timestamp.
+ * Used to verify that compute jobs receive the client's observable timestamp 
without polluting the node's global tracker.
+ */
+public class ObservableTimestampJob implements ComputeJob<Void, 
ObservableTimestampResult> {
+    @Override
+    public CompletableFuture<ObservableTimestampResult> 
executeAsync(JobExecutionContext context, Void arg) {
+        long perJobTs = 
unwrapIgniteTransactionsImpl(context.ignite().transactions()).observableTimestampTracker().getLong();
+        long nodeGlobalTs = 
unwrapIgniteImpl(context.ignite()).observableTimeTracker().getLong();
+
+        return completedFuture(new ObservableTimestampResult(perJobTs, 
nodeGlobalTs));
+    }
+}
diff --git 
a/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampResult.java
 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampResult.java
new file mode 100644
index 00000000000..975e3a1bc08
--- /dev/null
+++ 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/ObservableTimestampResult.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+/**
+ * Result of {@link ObservableTimestampJob} containing both per-job and node 
global observable timestamps.
+ */
+public class ObservableTimestampResult {
+    public long perJobTimestamp;
+    public long nodeGlobalTimestamp;
+
+    public ObservableTimestampResult() {
+    }
+
+    public ObservableTimestampResult(long perJobTimestamp, long 
nodeGlobalTimestamp) {
+        this.perJobTimestamp = perJobTimestamp;
+        this.nodeGlobalTimestamp = nodeGlobalTimestamp;
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index ab6e966b019..1eea4f59e18 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -47,7 +47,6 @@ import 
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
 import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.future.InFlightFutures;
-import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -90,8 +89,6 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
 
     private final EventLog eventLog;
 
-    private final HybridTimestampTracker observableTimestampTracker;
-
     private final ComputeMessaging messaging;
 
     private final ExecutionManager executionManager;
@@ -111,15 +108,13 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             UnitsContextManager jobContextManager,
             ComputeExecutor executor,
             ComputeConfiguration computeConfiguration,
-            EventLog eventLog,
-            HybridTimestampTracker observableTimestampTracker
+            EventLog eventLog
     ) {
         this.topologyService = topologyService;
         this.logicalTopologyService = logicalTopologyService;
         this.jobContextManager = jobContextManager;
         this.executor = executor;
         this.eventLog = eventLog;
-        this.observableTimestampTracker = observableTimestampTracker;
         executionManager = new ExecutionManager(computeConfiguration, 
topologyService);
         messaging = new ComputeMessaging(executionManager, messagingService, 
topologyService);
         failoverExecutor = Executors.newSingleThreadExecutor(
@@ -137,8 +132,6 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
         }
 
         try {
-            
observableTimestampTracker.update(executionContext.observableTimestamp());
-
             CompletableFuture<UnitsClassLoaderContext> classLoaderFut =
                     
jobContextManager.acquireClassLoader(executionContext.units(), 
executionContext.jobClassName());
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeIgniteFactory.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeIgniteFactory.java
new file mode 100644
index 00000000000..6ae7f9fb796
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeIgniteFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+
+/**
+ * Factory for creating per-job scoped {@link Ignite} instances.
+ *
+ * <p>Each compute job carries an observable timestamp from its initiator 
client.
+ * Instead of updating a global tracker (which would cause "timestamp 
pollution"
+ * across unrelated jobs), a per-job {@link HybridTimestampTracker} is created
+ * and used to scope the {@link Ignite} instance given to the job.
+ */
+@FunctionalInterface
+public interface ComputeIgniteFactory {
+    /**
+     * Creates an {@link Ignite} instance scoped to a specific compute job.
+     *
+     * @param tracker Per-job hybrid timestamp tracker.
+     * @return An Ignite instance that uses the given tracker for transactions 
and SQL.
+     */
+    Ignite createForJob(HybridTimestampTracker tracker);
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
index a65a610eb00..606958ccb49 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionContext.java
@@ -121,9 +121,10 @@ public class ExecutionContext {
      * Gets the observable timestamp from the job initiator client.
      * This ensures that the job sees the changes made by the client up to the 
point of job submission.
      *
+     * @param arg Job argument.
      * @return Observable timestamp or {@link 
HybridTimestamp#NULL_HYBRID_TIMESTAMP} if not set.
      */
-    public long observableTimestamp() {
+    public static long observableTimestamp(@Nullable ComputeJobDataHolder arg) 
{
         if (arg == null) {
             return HybridTimestamp.NULL_HYBRID_TIMESTAMP;
         }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 533f0b927a4..b5e7f1c4e46 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -403,13 +403,12 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
             ExecutionContext executionContext,
             @Nullable CancellationToken cancellationToken
     ) {
-        Set<InternalClusterNode> candidates1 = new HashSet<>();
+        Set<InternalClusterNode> candidates = new HashSet<>();
         for (InternalClusterNode node : nodes) {
             if (topologyService.getByConsistentId(node.name()) != null) {
-                candidates1.add(node);
+                candidates.add(node);
             }
         }
-        Set<InternalClusterNode> candidates = candidates1;
         if (candidates.isEmpty()) {
             Set<String> nodeNames = 
nodes.stream().map(InternalClusterNode::name).collect(Collectors.toSet());
             return failedFuture(new NodeNotFoundException(nodeNames));
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index da589a47294..8f4911b577b 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -21,6 +21,8 @@ import static 
org.apache.ignite.internal.compute.ComputeUtils.getJobExecuteArgum
 import static org.apache.ignite.internal.compute.ComputeUtils.jobClass;
 import static org.apache.ignite.internal.compute.ComputeUtils.taskClass;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.unmarshalOrNotIfNull;
+import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
+import static 
org.apache.ignite.internal.hlc.HybridTimestampTracker.atomicTracker;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 
@@ -34,9 +36,11 @@ import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobExecutorType;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecutionContext;
+import org.apache.ignite.internal.compute.ComputeIgniteFactory;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.ComputeJobDataType;
 import org.apache.ignite.internal.compute.ComputeUtils;
+import org.apache.ignite.internal.compute.ExecutionContext;
 import org.apache.ignite.internal.compute.ExecutionOptions;
 import org.apache.ignite.internal.compute.JobExecutionContextImpl;
 import org.apache.ignite.internal.compute.SharedComputeUtils;
@@ -53,6 +57,7 @@ import 
org.apache.ignite.internal.compute.task.TaskExecutionInternal;
 import org.apache.ignite.internal.deployunit.loader.UnitsClassLoader;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.TopologyService;
@@ -68,6 +73,8 @@ public class ComputeExecutorImpl implements ComputeExecutor {
 
     private final Ignite ignite;
 
+    private final ComputeIgniteFactory igniteFactory;
+
     private final ComputeConfiguration configuration;
 
     private final ComputeStateMachine stateMachine;
@@ -86,6 +93,7 @@ public class ComputeExecutorImpl implements ComputeExecutor {
      * Constructor.
      *
      * @param ignite Ignite instance for public API access.
+     * @param igniteFactory Factory for creating per-job scoped Ignite 
instances.
      * @param stateMachine Compute jobs state machine.
      * @param configuration Compute configuration.
      * @param topologyService Topology service.
@@ -93,6 +101,7 @@ public class ComputeExecutorImpl implements ComputeExecutor {
      */
     public ComputeExecutorImpl(
             Ignite ignite,
+            ComputeIgniteFactory igniteFactory,
             ComputeStateMachine stateMachine,
             ComputeConfiguration configuration,
             TopologyService topologyService,
@@ -100,6 +109,7 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
             EventLog eventLog
     ) {
         this.ignite = ignite;
+        this.igniteFactory = igniteFactory;
         this.configuration = configuration;
         this.stateMachine = stateMachine;
         this.topologyService = topologyService;
@@ -121,8 +131,9 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
     ) {
         assert executorService != null;
 
+        Ignite scopedIgnite = createIgniteForJob(arg);
         AtomicBoolean isInterrupted = new AtomicBoolean();
-        JobExecutionContext context = new JobExecutionContextImpl(ignite, 
isInterrupted, classLoader, options.partition());
+        JobExecutionContext context = new 
JobExecutionContextImpl(scopedIgnite, isInterrupted, classLoader, 
options.partition());
 
         metadataBuilder
                 .jobClassName(jobClassName)
@@ -143,6 +154,15 @@ public class ComputeExecutorImpl implements 
ComputeExecutor {
         return new JobExecutionInternal<>(execution, isInterrupted, null, 
false, topologyService.localMember());
     }
 
+    /**
+     * Extracts observable timestamp from the client payload, creates a 
per-job tracker and a wrapper for the Ignite instance.
+     */
+    private Ignite createIgniteForJob(@Nullable ComputeJobDataHolder arg) {
+        long obsTs = ExecutionContext.observableTimestamp(arg);
+        HybridTimestampTracker jobTracker = 
atomicTracker(nullableHybridTimestamp(obsTs));
+        return igniteFactory.createForJob(jobTracker);
+    }
+
     private static Callable<CompletableFuture<ComputeJobDataHolder>> 
addObservableTimestamp(
             Callable<CompletableFuture<ComputeJobDataHolder>> jobCallable,
             ClockService clockService) {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 27d6f640250..bb07bfc6875 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -94,7 +94,6 @@ import 
org.apache.ignite.internal.deployunit.loader.UnitsClassLoaderContext;
 import org.apache.ignite.internal.deployunit.loader.UnitsContextManager;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.hlc.HybridTimestampTracker;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -172,7 +171,14 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
 
         InMemoryComputeStateMachine stateMachine = new 
InMemoryComputeStateMachine(computeConfiguration, INSTANCE_NAME);
         ComputeExecutor computeExecutor = new ComputeExecutorImpl(
-                ignite, stateMachine, computeConfiguration, topologyService, 
new TestClockService(new HybridClockImpl()), EventLog.NOOP);
+                ignite,
+                tracker -> ignite,
+                stateMachine,
+                computeConfiguration,
+                topologyService,
+                new TestClockService(new HybridClockImpl()),
+                EventLog.NOOP
+        );
 
         computeComponent = new ComputeComponentImpl(
                 INSTANCE_NAME,
@@ -182,8 +188,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
                 unitsContextManager,
                 computeExecutor,
                 computeConfiguration,
-                EventLog.NOOP,
-                HybridTimestampTracker.emptyTracker()
+                EventLog.NOOP
         );
 
         assertThat(computeComponent.startAsync(new ComponentContext()), 
willCompleteSuccessfully());
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index ab113c518c1..4792737a2c2 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -85,7 +85,14 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
     void setUp() {
         InMemoryComputeStateMachine stateMachine = new 
InMemoryComputeStateMachine(computeConfiguration, "testNode");
         computeExecutor = new ComputeExecutorImpl(
-                ignite, stateMachine, computeConfiguration, topologyService, 
new TestClockService(new HybridClockImpl()), EventLog.NOOP);
+                ignite,
+                tracker -> ignite,
+                stateMachine,
+                computeConfiguration,
+                topologyService,
+                new TestClockService(new HybridClockImpl()),
+                EventLog.NOOP
+        );
 
         computeExecutor.start();
     }
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index a3ccf925804..56fd3527233 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -331,6 +331,8 @@ public class IgniteImpl implements Ignite {
 
     private final Path workDir;
 
+    private final Executor asyncContinuationExecutor;
+
     /** Lifecycle manager. */
     private final LifecycleManager lifecycleManager;
 
@@ -553,6 +555,7 @@ public class IgniteImpl implements Ignite {
     ) {
         this.name = node.name();
         this.workDir = workDir;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
 
         longJvmPauseDetector = new LongJvmPauseDetector(name);
 
@@ -1262,6 +1265,7 @@ public class IgniteImpl implements Ignite {
         InMemoryComputeStateMachine stateMachine = new 
InMemoryComputeStateMachine(computeCfg, name);
         ComputeExecutorImpl computeExecutor = new ComputeExecutorImpl(
                 this,
+                this::createJobScopedIgnite,
                 stateMachine,
                 computeCfg,
                 clusterSvc.topologyService(),
@@ -1293,8 +1297,7 @@ public class IgniteImpl implements Ignite {
                 ),
                 computeExecutor,
                 computeCfg,
-                eventLog,
-                observableTimestampTracker
+                eventLog
         );
 
         systemViewManager.register(computeComponent);
@@ -1362,6 +1365,10 @@ public class IgniteImpl implements Ignite {
         publicCluster = new PublicApiThreadingIgniteCluster(new 
IgniteClusterImpl(clusterSvc.topologyService(), clusterIdService));
     }
 
+    private JobScopedIgnite createJobScopedIgnite(HybridTimestampTracker 
tracker) {
+        return new JobScopedIgnite(this, tracker, txManager, sql, 
asyncContinuationExecutor);
+    }
+
     private GroupStoragesContextResolver createGroupStoragesContextResolver() {
         Map<String, LogStorageManager> logStorageManagerByGroupName = Map.of(
                 PARTITION_GROUP_NAME, partitionsLogStorageManager,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/JobScopedIgnite.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/JobScopedIgnite.java
new file mode 100644
index 00000000000..74c6c85be72
--- /dev/null
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/JobScopedIgnite.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.app;
+
+import java.util.concurrent.Executor;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.catalog.IgniteCatalog;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
+import org.apache.ignite.internal.sql.api.JobScopedIgniteSql;
+import org.apache.ignite.internal.sql.api.PublicApiThreadingIgniteSql;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.tx.impl.PublicApiThreadingIgniteTransactions;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.network.IgniteCluster;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.table.IgniteTables;
+import org.apache.ignite.tx.IgniteTransactions;
+
+/**
+ * A lightweight wrapper around {@link IgniteImpl} that scopes {@link 
#transactions()} and {@link #sql()} to a per-job
+ * {@link HybridTimestampTracker}. This prevents one compute job from 
polluting another job's observable timestamp.
+ */
+class JobScopedIgnite implements Ignite, Wrapper {
+    private final Ignite delegate;
+
+    private final IgniteTransactions scopedTransactions;
+
+    private final IgniteSql scopedSql;
+
+    JobScopedIgnite(
+            Ignite delegate,
+            HybridTimestampTracker jobTracker,
+            TxManager txManager,
+            IgniteSqlImpl sql,
+            Executor asyncContinuationExecutor
+    ) {
+        this.delegate = delegate;
+        this.scopedTransactions = new PublicApiThreadingIgniteTransactions(
+                new IgniteTransactionsImpl(txManager, jobTracker), 
asyncContinuationExecutor
+        );
+        this.scopedSql = new PublicApiThreadingIgniteSql(
+                new JobScopedIgniteSql(sql, jobTracker), 
asyncContinuationExecutor
+        );
+    }
+
+    @Override
+    public String name() {
+        return delegate.name();
+    }
+
+    @Override
+    public IgniteTables tables() {
+        return delegate.tables();
+    }
+
+    @Override
+    public IgniteTransactions transactions() {
+        return scopedTransactions;
+    }
+
+    @Override
+    public IgniteSql sql() {
+        return scopedSql;
+    }
+
+    @Override
+    public IgniteCompute compute() {
+        return delegate.compute();
+    }
+
+    @Override
+    public IgniteCatalog catalog() {
+        return delegate.catalog();
+    }
+
+    @Override
+    public IgniteCluster cluster() {
+        return delegate.cluster();
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return classToUnwrap.cast(delegate);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 958a2b301f3..0d6c1f12adf 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -259,7 +259,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
             @Nullable Object... arguments) {
         Objects.requireNonNull(statement);
 
-        CompletableFuture<AsyncResultSet<T>> future = 
executeAsync(transaction, mapper, statement, arguments);
+        CompletableFuture<AsyncResultSet<T>> future = 
executeAsync(transaction, mapper, cancellationToken, statement, arguments);
         return new SyncResultSetAdapter<>(sync(future));
     }
 
@@ -307,7 +307,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
             String query,
             @Nullable Object... arguments
     ) {
-        return executeAsyncInternal(transaction, cancellationToken, 
createStatement(query), arguments);
+        return executeAsync(transaction, cancellationToken, 
createStatement(query), arguments);
     }
 
     /** {@inheritDoc} */
@@ -318,7 +318,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
             Statement statement,
             @Nullable Object... arguments
     ) {
-        return executeAsyncInternal(transaction, cancellationToken, statement, 
arguments);
+        return executeAsyncInternal(observableTimestampTracker, transaction, 
cancellationToken, statement, arguments);
     }
 
     /** {@inheritDoc} */
@@ -346,7 +346,18 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
         throw new UnsupportedOperationException("Not implemented yet.");
     }
 
-    private CompletableFuture<AsyncResultSet<SqlRow>> executeAsyncInternal(
+    /**
+     * Executes a single SQL statement asynchronously using the given 
observable timestamp tracker.
+     *
+     * @param tracker Observable timestamp tracker to use.
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param statement SQL statement to execute.
+     * @param arguments Arguments for the statement.
+     * @return Operation future.
+     */
+    protected CompletableFuture<AsyncResultSet<SqlRow>> executeAsyncInternal(
+            HybridTimestampTracker tracker,
             @Nullable Transaction transaction,
             @Nullable CancellationToken cancellationToken,
             Statement statement,
@@ -369,7 +380,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
 
             result = queryProcessor.queryAsync(
                     properties,
-                    observableTimestampTracker,
+                    tracker,
                     (InternalTransaction) transaction,
                     cancellationToken,
                     statement.query(),
@@ -424,6 +435,27 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
             @Nullable CancellationToken cancellationToken,
             Statement statement,
             BatchedArguments batch
+    ) {
+        return executeBatchAsyncInternal(observableTimestampTracker, 
transaction, cancellationToken, statement, batch);
+    }
+
+    /**
+     * Executes a batched SQL statement asynchronously using the given 
observable timestamp tracker.
+     *
+     * @param tracker Observable timestamp tracker to use.
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param statement SQL statement to execute.
+     * @param batch List of batch rows, where each row is a list of statement 
arguments.
+     * @return Operation Future completed with the number of rows affected by 
each query in the batch
+     *         (if the batch succeeds), future completed with the {@link 
SqlBatchException} (if the batch fails).
+     */
+    protected CompletableFuture<long[]> executeBatchAsyncInternal(
+            HybridTimestampTracker tracker,
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            BatchedArguments batch
     ) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(nodeIsStoppingException());
@@ -434,7 +466,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
 
             return executeBatchCore(
                     queryProcessor,
-                    observableTimestampTracker,
+                    tracker,
                     (InternalTransaction) transaction,
                     cancellationToken,
                     statement.query(),
@@ -578,6 +610,24 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
     public CompletableFuture<Void> executeScriptAsync(
             @Nullable CancellationToken cancellationToken, String query,
             @Nullable Object... arguments
+    ) {
+        return executeScriptAsyncInternal(observableTimestampTracker, 
cancellationToken, query, arguments);
+    }
+
+    /**
+     * Executes a multi-statement SQL query using the given observable 
timestamp tracker.
+     *
+     * @param tracker Observable timestamp tracker to use.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @param query SQL query template.
+     * @param arguments Arguments for the template (optional).
+     * @return Operation future.
+     */
+    protected CompletableFuture<Void> executeScriptAsyncInternal(
+            HybridTimestampTracker tracker,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
     ) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(nodeIsStoppingException());
@@ -586,7 +636,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent, Wrapper {
         try {
             return executeScriptCore(
                     queryProcessor,
-                    observableTimestampTracker,
+                    tracker,
                     busyLock::enterBusy,
                     busyLock::leaveBusy,
                     query,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/JobScopedIgniteSql.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/JobScopedIgniteSql.java
new file mode 100644
index 00000000000..a8e8421118a
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/JobScopedIgniteSql.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static org.apache.ignite.internal.util.IgniteUtils.getInterruptibly;
+
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestampTracker;
+import org.apache.ignite.internal.sql.SyncResultSetAdapter;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A scoped view of {@link IgniteSqlImpl} that uses a per-job {@link 
HybridTimestampTracker} instead of the node's global one.
+ * Delegates all operations to the original {@link IgniteSqlImpl} instance, 
sharing its lifecycle, busy lock, and cursor tracking.
+ */
+public class JobScopedIgniteSql implements IgniteSql, Wrapper {
+    private final IgniteSqlImpl delegate;
+
+    private final HybridTimestampTracker jobTracker;
+
+    public JobScopedIgniteSql(IgniteSqlImpl delegate, HybridTimestampTracker 
jobTracker) {
+        this.delegate = delegate;
+        this.jobTracker = jobTracker;
+    }
+
+    @Override
+    public Statement createStatement(String query) {
+        return delegate.createStatement(query);
+    }
+
+    @Override
+    public StatementBuilder statementBuilder() {
+        return delegate.statementBuilder();
+    }
+
+    @Override
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        Objects.requireNonNull(query);
+
+        return new SyncResultSetAdapter<>(sync(executeAsync(transaction, 
cancellationToken, query, arguments)));
+    }
+
+    @Override
+    public ResultSet<SqlRow> execute(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        Objects.requireNonNull(statement);
+
+        return new SyncResultSetAdapter<>(sync(executeAsync(transaction, 
cancellationToken, statement, arguments)));
+    }
+
+    @Override
+    public <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        Objects.requireNonNull(query);
+
+        return new SyncResultSetAdapter<>(sync(executeAsync(transaction, 
mapper, cancellationToken, query, arguments)));
+    }
+
+    @Override
+    public <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        Objects.requireNonNull(statement);
+
+        return new SyncResultSetAdapter<>(sync(executeAsync(transaction, 
mapper, cancellationToken, statement, arguments)));
+    }
+
+    @Override
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return executeAsync(transaction, cancellationToken, 
createStatement(query), arguments);
+    }
+
+    @Override
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        return delegate.executeAsyncInternal(jobTracker, transaction, 
cancellationToken, statement, arguments);
+    }
+
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        // TODO: IGNITE-18695.
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            @Nullable Object... arguments
+    ) {
+        // TODO: IGNITE-18695.
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    @Override
+    public long[] executeBatch(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String dmlQuery,
+            BatchedArguments batch
+    ) {
+        return sync(executeBatchAsync(transaction, cancellationToken, 
dmlQuery, batch));
+    }
+
+    @Override
+    public long[] executeBatch(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement dmlStatement,
+            BatchedArguments batch
+    ) {
+        return sync(executeBatchAsync(transaction, cancellationToken, 
dmlStatement, batch));
+    }
+
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            BatchedArguments batch
+    ) {
+        return executeBatchAsync(transaction, cancellationToken, 
createStatement(query), batch);
+    }
+
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(
+            @Nullable Transaction transaction,
+            @Nullable CancellationToken cancellationToken,
+            Statement statement,
+            BatchedArguments batch
+    ) {
+        return delegate.executeBatchAsyncInternal(jobTracker, transaction, 
cancellationToken, statement, batch);
+    }
+
+    @Override
+    public void executeScript(String query, @Nullable Object... arguments) {
+        executeScript(null, query, arguments);
+    }
+
+    @Override
+    public void executeScript(@Nullable CancellationToken cancellationToken, 
String query, @Nullable Object... arguments) {
+        Objects.requireNonNull(query);
+
+        sync(executeScriptAsync(cancellationToken, query, arguments));
+    }
+
+    @Override
+    public CompletableFuture<Void> executeScriptAsync(String query, @Nullable 
Object... arguments) {
+        return executeScriptAsync(null, query, arguments);
+    }
+
+    @Override
+    public CompletableFuture<Void> executeScriptAsync(
+            @Nullable CancellationToken cancellationToken,
+            String query,
+            @Nullable Object... arguments
+    ) {
+        return delegate.executeScriptAsyncInternal(jobTracker, 
cancellationToken, query, arguments);
+    }
+
+    private static <T> T sync(CompletableFuture<T> future) {
+        return getInterruptibly(future);
+    }
+
+    @Override
+    public <T> T unwrap(Class<T> classToUnwrap) {
+        return delegate.unwrap(classToUnwrap);
+    }
+}
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
index 93b061c6569..14ca977acdc 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java
@@ -80,4 +80,9 @@ public class IgniteTransactionsImpl implements 
IgniteTransactions {
     public Transaction beginWithPriority(boolean readOnly, TxPriority 
priority) {
         return txManager.beginExplicit(observableTimestampTracker, readOnly, 
InternalTxOptions.defaultsWithPriority(priority));
     }
+
+    @TestOnly
+    public HybridTimestampTracker observableTimestampTracker() {
+        return observableTimestampTracker;
+    }
 }

Reply via email to