This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f2c02191d0 IGNITE-21289 .NET: Implement job execution interface (#3159)
f2c02191d0 is described below

commit f2c02191d03505d592e8e76aa1b6494568668119
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Wed Feb 7 11:30:52 2024 +0200

    IGNITE-21289 .NET: Implement job execution interface (#3159)
    
    Add job execution API:
    * Return `IJobExecution<T>` from `ICompute` methods instead of a raw result
    * Implement `GetStatusAsync`, `CancelAsync`, `ChangePriorityAsync`
---
 .../java/org/apache/ignite/compute/JobState.java   |   2 +-
 .../java/org/apache/ignite/compute/JobStatus.java  |  73 +++++++++
 .../Compute/ComputeClusterAwarenessTests.cs        |  21 +--
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    | 173 ++++++++++++++++-----
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  22 ++-
 .../PartitionAwarenessRealClusterTests.cs          |   4 +-
 .../Proto/ColocationHashTests.cs                   |   8 +-
 .../Table/SchemaSynchronizationTest.cs             |   5 +-
 .../dotnet/Apache.Ignite/ClientOperationType.cs    |  17 +-
 .../dotnet/Apache.Ignite/Compute/ICompute.cs       |   8 +-
 .../dotnet/Apache.Ignite/Compute/IJobExecution.cs  |  67 ++++++++
 .../dotnet/Apache.Ignite/Compute/JobState.cs}      |  59 +++----
 .../dotnet/Apache.Ignite/Compute/JobStatus.cs}     |  51 ++----
 .../Apache.Ignite/Internal/Compute/Compute.cs      | 129 +++++++++++----
 .../Apache.Ignite/Internal/Compute/JobExecution.cs |  95 +++++++++++
 .../Apache.Ignite/Internal/Proto/ClientOp.cs       |  11 +-
 .../Internal/Proto/ClientOpExtensions.cs           |   3 +
 .../Internal/Proto/MsgPack/MsgPackReader.cs        |  15 ++
 .../dotnet/Apache.Ignite/RetryReadPolicy.cs        |   3 +
 19 files changed, 618 insertions(+), 148 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java 
b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
index 4c7d18a4cc..a60c025217 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
@@ -42,7 +42,7 @@ public enum JobState {
     COMPLETED,
 
     /**
-     * The job has received the cancel command, but it is still running.
+     * The job has received the cancel command, but is still running.
      */
     CANCELING,
 
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java 
b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
index 3ee4ac2910..e04f27c74e 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java
@@ -64,32 +64,67 @@ public class JobStatus implements Serializable {
         this.finishTime = builder.finishTime;
     }
 
+    /**
+     * Creates a new builder.
+     *
+     * @return Builder.
+     */
     public static Builder builder() {
         return new Builder();
     }
 
+    /**
+     * Returns job ID.
+     *
+     * @return Job ID.
+     */
     public UUID id() {
         return id;
     }
 
+    /**
+     * Returns job state.
+     *
+     * @return Job state.
+     */
     public JobState state() {
         return state;
     }
 
+    /**
+     * Returns job create time.
+     *
+     * @return Job create time.
+     */
     public Instant createTime() {
         return createTime;
     }
 
+    /**
+     * Returns job start time. {@code null} if the job has not started yet.
+     *
+     * @return Job start time. {@code null} if the job has not started yet.
+     */
     @Nullable
     public Instant startTime() {
         return startTime;
     }
 
+    /**
+     * Returns job finish time. {@code null} if the job has not finished yet.
+     *
+     * @return Job finish time. {@code null} if the job has not finished yet.
+     */
     @Nullable
     public Instant finishTime() {
         return finishTime;
     }
 
+    /**
+     * Returns a new builder with the same property values as this JobStatus.
+     *
+     * @return Builder.
+     */
     public Builder toBuilder() {
         return new Builder(this);
     }
@@ -106,6 +141,9 @@ public class JobStatus implements Serializable {
         @Nullable
         private Instant finishTime;
 
+        /**
+         * Constructor.
+         */
         public Builder() {
         }
 
@@ -117,31 +155,66 @@ public class JobStatus implements Serializable {
             this.finishTime = status.finishTime;
         }
 
+        /**
+         * Sets job ID.
+         *
+         * @param id Job ID.
+         * @return This builder.
+         */
         public Builder id(UUID id) {
             this.id = id;
             return this;
         }
 
+        /**
+         * Sets job state.
+         *
+         * @param state Job state.
+         * @return This builder.
+         */
         public Builder state(JobState state) {
             this.state = state;
             return this;
         }
 
+        /**
+         * Sets job create time.
+         *
+         * @param createTime Job create time.
+         * @return This builder.
+         */
         public Builder createTime(Instant createTime) {
             this.createTime = createTime;
             return this;
         }
 
+        /**
+         * Sets job start time.
+         *
+         * @param startTime Job start time.
+         * @return This builder.
+         */
         public Builder startTime(@Nullable Instant startTime) {
             this.startTime = startTime;
             return this;
         }
 
+        /**
+         * Sets job finish time.
+         *
+         * @param finishTime Job finish time.
+         * @return This builder.
+         */
         public Builder finishTime(@Nullable Instant finishTime) {
             this.finishTime = finishTime;
             return this;
         }
 
+        /**
+         * Builds a new JobStatus.
+         *
+         * @return JobStatus.
+         */
         public JobStatus build() {
             return new JobStatus(this);
         }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
index 7b1fede7cc..25ac8b6c99 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs
@@ -45,14 +45,14 @@ namespace Apache.Ignite.Tests.Compute
             using var client = await IgniteClient.StartAsync(clientCfg);
             client.WaitForConnections(3);
 
-            var res2 = await client.Compute.ExecuteAsync<string>(
+            IJobExecution<string> exec2 = await 
client.Compute.ExecuteAsync<string>(
                 new[] { server2.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
 
-            var res3 = await client.Compute.ExecuteAsync<string>(
+            IJobExecution<string> exec3 = await 
client.Compute.ExecuteAsync<string>(
                 new[] { server3.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
 
-            Assert.AreEqual("s2", res2);
-            Assert.AreEqual("s3", res3);
+            Assert.AreEqual("s2", await exec2.GetResultAsync());
+            Assert.AreEqual("s3", await exec3.GetResultAsync());
 
             Assert.AreEqual(ClientOp.ComputeExecute, 
server2.ClientOps.Single());
             Assert.AreEqual(ClientOp.ComputeExecute, 
server3.ClientOps.Single());
@@ -69,14 +69,14 @@ namespace Apache.Ignite.Tests.Compute
 
             using var client = await server1.ConnectClientAsync();
 
-            var res2 = await client.Compute.ExecuteAsync<string>(
+            IJobExecution<string> exec2 = await 
client.Compute.ExecuteAsync<string>(
                 new[] { server2.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
 
-            var res3 = await client.Compute.ExecuteAsync<string>(
+            IJobExecution<string> exec3 = await 
client.Compute.ExecuteAsync<string>(
                 new[] { server3.Node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
 
-            Assert.AreEqual("s1", res2);
-            Assert.AreEqual("s1", res3);
+            Assert.AreEqual("s1", await exec2.GetResultAsync());
+            Assert.AreEqual("s1", await exec3.GetResultAsync());
             Assert.AreEqual(new[] { ClientOp.ComputeExecute, 
ClientOp.ComputeExecute }, server1.ClientOps);
 
             Assert.IsEmpty(server2.ClientOps);
@@ -105,9 +105,12 @@ namespace Apache.Ignite.Tests.Compute
             for (int i = 0; i < 100; i++)
             {
                 var node = i % 2 == 0 ? server1.Node : server2.Node;
-                var res = await client.Compute.ExecuteAsync<string>(
+
+                IJobExecution<string> jobExecution = await 
client.Compute.ExecuteAsync<string>(
                     new[] { node }, Array.Empty<DeploymentUnit>(), 
jobClassName: string.Empty);
 
+                string res = await jobExecution.GetResultAsync();
+
                 nodeNames.Add(res);
             }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 145357c8b7..838265a57d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -26,6 +26,7 @@ namespace Apache.Ignite.Tests.Compute
     using System.Threading.Tasks;
     using Ignite.Compute;
     using Ignite.Table;
+    using Internal.Compute;
     using Internal.Network;
     using Internal.Proto;
     using Network;
@@ -86,14 +87,15 @@ namespace Apache.Ignite.Tests.Compute
             var res1 = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(0), Units, NodeNameJob, "-", 11);
             var res2 = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, NodeNameJob, ":", 22);
 
-            Assert.AreEqual(PlatformTestNodeRunner + "-_11", res1);
-            Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", res2);
+            Assert.AreEqual(PlatformTestNodeRunner + "-_11", await 
res1.GetResultAsync());
+            Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", await 
res2.GetResultAsync());
         }
 
         [Test]
         public async Task TestExecuteOnRandomNode()
         {
-            var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, NodeNameJob);
+            var jobExecution = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, NodeNameJob);
+            var res = await jobExecution.GetResultAsync();
 
             var expectedNodeNames = Enumerable.Range(1, 4)
                 .Select(x => x == 1 ? PlatformTestNodeRunner : 
PlatformTestNodeRunner + "_" + x)
@@ -103,10 +105,10 @@ namespace Apache.Ignite.Tests.Compute
         }
 
         [Test]
-        public void TestExecuteResultTypeMismatchThrowsInvalidCastException()
+        public async Task 
TestExecuteResultTypeMismatchThrowsInvalidCastException()
         {
-            Assert.ThrowsAsync<InvalidCastException>(async () =>
-                await Client.Compute.ExecuteAsync<Guid>(await 
Client.GetClusterNodesAsync(), Units, NodeNameJob));
+            var jobExecution = await Client.Compute.ExecuteAsync<Guid>(await 
Client.GetClusterNodesAsync(), Units, NodeNameJob);
+            Assert.ThrowsAsync<InvalidCastException>(async () => await 
jobExecution.GetResultAsync());
         }
 
         [Test]
@@ -114,13 +116,13 @@ namespace Apache.Ignite.Tests.Compute
         {
             var nodes = await GetNodeAsync(0);
 
-            IDictionary<IClusterNode, Task<string>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
+            IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
             var res = await taskMap[nodes[0]];
 
             Assert.AreEqual(1, taskMap.Count);
             Assert.AreSame(nodes[0], taskMap.Keys.Single());
 
-            Assert.AreEqual(PlatformTestNodeRunner + "123", res);
+            Assert.AreEqual(PlatformTestNodeRunner + "123", await 
res.GetResultAsync());
         }
 
         [Test]
@@ -128,7 +130,7 @@ namespace Apache.Ignite.Tests.Compute
         {
             var nodes = await Client.GetClusterNodesAsync();
 
-            IDictionary<IClusterNode, Task<string>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
+            IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap = 
Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123");
             var res1 = await taskMap[nodes[0]];
             var res2 = await taskMap[nodes[1]];
             var res3 = await taskMap[nodes[2]];
@@ -136,10 +138,10 @@ namespace Apache.Ignite.Tests.Compute
 
             Assert.AreEqual(4, taskMap.Count);
 
-            Assert.AreEqual(nodes[0].Name + "123", res1);
-            Assert.AreEqual(nodes[1].Name + "123", res2);
-            Assert.AreEqual(nodes[2].Name + "123", res3);
-            Assert.AreEqual(nodes[3].Name + "123", res4);
+            Assert.AreEqual(nodes[0].Name + "123", await 
res1.GetResultAsync());
+            Assert.AreEqual(nodes[1].Name + "123", await 
res2.GetResultAsync());
+            Assert.AreEqual(nodes[2].Name + "123", await 
res3.GetResultAsync());
+            Assert.AreEqual(nodes[3].Name + "123", await 
res4.GetResultAsync());
         }
 
         [Test]
@@ -147,7 +149,7 @@ namespace Apache.Ignite.Tests.Compute
         {
             var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, ConcatJob, 1.1, Guid.Empty, "3", null);
 
-            Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null", 
res);
+            Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null", 
await res.GetResultAsync());
         }
 
         [Test]
@@ -155,14 +157,14 @@ namespace Apache.Ignite.Tests.Compute
         {
             var res = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, ConcatJob, args: null);
 
-            Assert.IsNull(res);
+            Assert.IsNull(await res.GetResultAsync());
         }
 
         [Test]
-        public void TestJobErrorPropagatesToClientWithClassAndMessage()
+        public async Task TestJobErrorPropagatesToClientWithClassAndMessage()
         {
-            var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-                await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, ErrorJob, "unused"));
+            var jobExecution = await Client.Compute.ExecuteAsync<string>(await 
Client.GetClusterNodesAsync(), Units, ErrorJob, "unused");
+            var ex = Assert.ThrowsAsync<IgniteException>(async () => await 
jobExecution.GetResultAsync());
 
             StringAssert.Contains("Custom job error", ex!.Message);
 
@@ -230,7 +232,8 @@ namespace Apache.Ignite.Tests.Compute
             {
                 var nodes = await Client.GetClusterNodesAsync();
                 var str = expectedStr ?? val.ToString()!.Replace("E+", "E");
-                var res = await Client.Compute.ExecuteAsync<object>(nodes, 
Units, EchoJob, val, str);
+                IJobExecution<object> resExec = await 
Client.Compute.ExecuteAsync<object>(nodes, Units, EchoJob, val, str);
+                object res = await resExec.GetResultAsync();
 
                 Assert.AreEqual(val, res);
             }
@@ -268,9 +271,9 @@ namespace Apache.Ignite.Tests.Compute
             var nodeName = nodeIdx == 1 ? string.Empty : "_" + nodeIdx;
             var expectedNodeName = PlatformTestNodeRunner + nodeName;
 
-            Assert.AreEqual(expectedNodeName, resNodeName);
-            Assert.AreEqual(expectedNodeName, resNodeName2);
-            Assert.AreEqual(expectedNodeName, resNodeName3);
+            Assert.AreEqual(expectedNodeName, await 
resNodeName.GetResultAsync());
+            Assert.AreEqual(expectedNodeName, await 
resNodeName2.GetResultAsync());
+            Assert.AreEqual(expectedNodeName, await 
resNodeName3.GetResultAsync());
 
             // We only connect to 2 of 4 nodes because of different auth 
settings.
             if (nodeIdx < 3)
@@ -306,17 +309,22 @@ namespace Apache.Ignite.Tests.Compute
         {
             // Create table and use it in ExecuteColocated.
             var nodes = await GetNodeAsync(0);
-            var tableName = await Client.Compute.ExecuteAsync<string>(nodes, 
Units, CreateTableJob, "drop_me");
+            var tableNameExec = await 
Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, "drop_me");
+            var tableName = await tableNameExec.GetResultAsync();
 
             try
             {
                 var keyTuple = new IgniteTuple { [KeyCol] = 1L };
-                var resNodeName = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, 
NodeNameJob);
+                var resNodeNameExec = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, 
NodeNameJob);
+                var resNodeName = await resNodeNameExec.GetResultAsync();
 
                 // Drop table and create a new one with a different ID, then 
execute a computation again.
                 // This should update the cached table and complete the 
computation successfully.
-                await Client.Compute.ExecuteAsync<string>(nodes, Units, 
DropTableJob, tableName);
-                await Client.Compute.ExecuteAsync<string>(nodes, Units, 
CreateTableJob, tableName);
+                var dropExec = await 
Client.Compute.ExecuteAsync<string>(nodes, Units, DropTableJob, tableName);
+                await dropExec.GetResultAsync();
+
+                var createExec = await 
Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, tableName);
+                await createExec.GetResultAsync();
 
                 if (forceLoadAssignment)
                 {
@@ -324,21 +332,22 @@ namespace Apache.Ignite.Tests.Compute
                     table.SetFieldValue("_partitionAssignment", null);
                 }
 
-                var resNodeName2 = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, 
NodeNameJob);
+                var resNodeName2Exec = await 
Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, 
NodeNameJob);
+                var resNodeName2 = await resNodeName2Exec.GetResultAsync();
 
                 Assert.AreEqual(resNodeName, resNodeName2);
             }
             finally
             {
-                await Client.Compute.ExecuteAsync<string>(nodes, Units, 
DropTableJob, tableName);
+                await (await Client.Compute.ExecuteAsync<string>(nodes, Units, 
DropTableJob, tableName)).GetResultAsync();
             }
         }
 
         [Test]
-        public void 
TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace()
+        public async Task 
TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace()
         {
-            var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-                await Client.Compute.ExecuteAsync<object>(await 
GetNodeAsync(1), Units, ExceptionJob, "foo-bar"));
+            var jobExecution = await Client.Compute.ExecuteAsync<object>(await 
GetNodeAsync(1), Units, ExceptionJob, "foo-bar");
+            var ex = Assert.ThrowsAsync<IgniteException>(async () => await 
jobExecution.GetResultAsync());
 
             Assert.AreEqual("Test exception: foo-bar", ex!.Message);
             Assert.IsNotNull(ex.InnerException);
@@ -353,10 +362,10 @@ namespace Apache.Ignite.Tests.Compute
         }
 
         [Test]
-        public void TestCheckedExceptionInJobPropagatesToClient()
+        public async Task TestCheckedExceptionInJobPropagatesToClient()
         {
-            var ex = Assert.ThrowsAsync<IgniteException>(async () =>
-                await Client.Compute.ExecuteAsync<object>(await 
GetNodeAsync(1), Units, CheckedExceptionJob, "foo-bar"));
+            var jobExecution = await Client.Compute.ExecuteAsync<object>(await 
GetNodeAsync(1), Units, CheckedExceptionJob, "foo-bar");
+            var ex = Assert.ThrowsAsync<IgniteException>(async () => await 
jobExecution.GetResultAsync());
 
             Assert.AreEqual("TestCheckedEx: foo-bar", ex!.Message);
             Assert.IsNotNull(ex.InnerException);
@@ -377,18 +386,18 @@ namespace Apache.Ignite.Tests.Compute
             using var client = await server.ConnectClientAsync();
 
             var res = await client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), units, FakeServer.GetDetailsJob);
-            StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", 
res);
+            StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", 
await res.GetResultAsync());
 
             // Lazy enumerable.
             var res2 = await client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), units.Reverse(), FakeServer.GetDetailsJob);
-            StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest", 
res2);
+            StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest", 
await res2.GetResultAsync());
 
             // Colocated.
             var keyTuple = new IgniteTuple { ["ID"] = 1 };
             var res3 = await client.Compute.ExecuteColocatedAsync<string>(
                 FakeServer.ExistingTableName, keyTuple, units, 
FakeServer.GetDetailsJob);
 
-            StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", 
res3);
+            StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", 
await res3.GetResultAsync());
         }
 
         [Test]
@@ -466,7 +475,8 @@ namespace Apache.Ignite.Tests.Compute
             using var client = await IgniteClient.StartAsync(GetConfig());
 
             const int sleepMs = 3000;
-            var jobTask = client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, SleepJob, sleepMs);
+            var jobExecution = await client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, SleepJob, sleepMs);
+            var jobTask = jobExecution.GetResultAsync();
 
             // Wait a bit and close the connection.
             await Task.Delay(10);
@@ -476,6 +486,93 @@ namespace Apache.Ignite.Tests.Compute
             Assert.AreEqual("Connection closed.", ex!.Message);
         }
 
+        [Test]
+        public async Task TestJobExecutionStatusExecuting()
+        {
+            const int sleepMs = 3000;
+            var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+            var jobExecution = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, SleepJob, sleepMs);
+
+            await AssertJobStatus(jobExecution, JobState.Executing, 
beforeStart);
+        }
+
+        [Test]
+        public async Task TestJobExecutionStatusCompleted()
+        {
+            const int sleepMs = 1;
+            var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+            var jobExecution = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, SleepJob, sleepMs);
+            await jobExecution.GetResultAsync();
+
+            await AssertJobStatus(jobExecution, JobState.Completed, 
beforeStart);
+        }
+
+        [Test]
+        public async Task TestJobExecutionStatusFailed()
+        {
+            var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+            var jobExecution = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, ErrorJob, "unused");
+            Assert.CatchAsync(async () => await jobExecution.GetResultAsync());
+
+            await AssertJobStatus(jobExecution, JobState.Failed, beforeStart);
+        }
+
+        [Test]
+        public async Task TestJobExecutionStatusNull()
+        {
+            var fakeJobExecution = new JobExecution<int>(
+                Guid.NewGuid(), Task.FromException<(int, JobStatus)>(new 
Exception("x")), (Compute)Client.Compute);
+
+            var status = await fakeJobExecution.GetStatusAsync();
+
+            Assert.IsNull(status);
+        }
+
+        [Test]
+        public async Task TestJobExecutionCancel()
+        {
+            const int sleepMs = 5000;
+            var beforeStart = SystemClock.Instance.GetCurrentInstant();
+
+            var jobExecution = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, SleepJob, sleepMs);
+            await jobExecution.CancelAsync();
+
+            await AssertJobStatus(jobExecution, JobState.Canceled, 
beforeStart);
+        }
+
+        [Test]
+        public async Task TestChangePriority()
+        {
+            var jobExecution = await Client.Compute.ExecuteAsync<string>(await 
GetNodeAsync(1), Units, SleepJob, 5000);
+            var res = await jobExecution.ChangePriorityAsync(123);
+
+            // Job exists, but is already executing.
+            Assert.IsFalse(res);
+        }
+
+        private static async Task AssertJobStatus<T>(IJobExecution<T> 
jobExecution, JobState state, Instant beforeStart)
+        {
+            JobStatus? status = await jobExecution.GetStatusAsync();
+
+            Assert.IsNotNull(status);
+            Assert.AreEqual(jobExecution.Id, status!.Id);
+            Assert.AreEqual(state, status.State);
+            Assert.Greater(status.CreateTime, beforeStart);
+            Assert.Greater(status.StartTime, status.CreateTime);
+
+            if (state is JobState.Canceled or JobState.Completed or 
JobState.Failed)
+            {
+                Assert.Greater(status.FinishTime, status.StartTime);
+            }
+            else
+            {
+                Assert.IsNull(status.FinishTime);
+            }
+        }
+
         private async Task<List<IClusterNode>> GetNodeAsync(int index) =>
             (await Client.GetClusterNodesAsync()).OrderBy(n => 
n.Name).Skip(index).Take(1).ToList();
     }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 493a6ab897..dc1a974633 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -299,7 +299,19 @@ namespace Apache.Ignite.Tests
                     case ClientOp.ComputeExecuteColocated:
                     {
                         using var pooledArrayBuffer = ComputeExecute(reader, 
colocated: opCode == ClientOp.ComputeExecuteColocated);
-                        Send(handler, requestId, ReadOnlyMemory<byte>.Empty);
+
+                        using var resWriter = new PooledArrayBuffer();
+
+                        var rw = resWriter.MessageWriter;
+                        if (opCode == ClientOp.ComputeExecuteColocated)
+                        {
+                            // Schema version.
+                            rw.Write(1);
+                        }
+
+                        rw.Write(Guid.NewGuid());
+
+                        Send(handler, requestId, resWriter);
                         Send(handler, requestId, pooledArrayBuffer, 
isNotification: true);
                         continue;
                     }
@@ -656,6 +668,14 @@ namespace Apache.Ignite.Tests
 
             writer.Write(builder.Build().Span);
 
+            // Status
+            writer.Write(Guid.NewGuid());
+            writer.Write(0); // State.
+            writer.Write(0L); // Create time.
+            writer.Write(0);
+            writer.WriteNil(); // Start time.
+            writer.WriteNil(); // Finish time.
+
             return arrayBufferWriter;
         }
 
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
index cba89ada6a..2c24bb3ebb 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs
@@ -50,12 +50,14 @@ public class PartitionAwarenessRealClusterTests : 
IgniteTestsBase
         {
             var keyTuple = new IgniteTuple { ["KEY"] = key };
 
-            var primaryNodeName = await 
client.Compute.ExecuteColocatedAsync<string>(
+            var primaryNodeNameExec = await 
client.Compute.ExecuteColocatedAsync<string>(
                 TableName,
                 keyTuple,
                 Array.Empty<DeploymentUnit>(),
                 ComputeTests.NodeNameJob);
 
+            var primaryNodeName = await primaryNodeNameExec.GetResultAsync();
+
             if (primaryNodeName.EndsWith("_3", StringComparison.Ordinal) || 
primaryNodeName.EndsWith("_4", StringComparison.Ordinal))
             {
                 // Skip nodes without direct client connection.
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
index 4539084c7c..3e0d6282b4 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs
@@ -194,13 +194,15 @@ public class ColocationHashTests : IgniteTestsBase
             using var writer = ProtoCommon.GetMessageWriter();
             var clientColocationHash = ser.Write(writer, null, schema, key);
 
-            var serverColocationHash = await Client.Compute.ExecuteAsync<int>(
+            var serverColocationHashExec = await 
Client.Compute.ExecuteAsync<int>(
                 clusterNodes,
                 Array.Empty<DeploymentUnit>(),
                 TableRowColocationHashJob,
                 tableName,
                 i);
 
+            var serverColocationHash = await 
serverColocationHashExec.GetResultAsync();
+
             Assert.AreEqual(serverColocationHash, clientColocationHash, 
key.ToString());
         }
     }
@@ -327,7 +329,7 @@ public class ColocationHashTests : IgniteTestsBase
     {
         var nodes = await Client.GetClusterNodesAsync();
 
-        return await Client.Compute.ExecuteAsync<int>(
+        IJobExecution<int> jobExecution = await 
Client.Compute.ExecuteAsync<int>(
             nodes,
             Array.Empty<DeploymentUnit>(),
             ColocationHashJob,
@@ -335,6 +337,8 @@ public class ColocationHashTests : IgniteTestsBase
             bytes,
             timePrecision,
             timestampPrecision);
+
+        return await jobExecution.GetResultAsync();
     }
 
     private record TestIndexProvider(Func<int, int> ColumnOrderDelegate, int 
HashedColumnCount) : IHashedColumnIndexProvider
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
index b5ff24bccf..f164946556 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs
@@ -292,8 +292,11 @@ public class SchemaSynchronizationTest : IgniteTestsBase
                 break;
 
             case TestMode.Compute:
-                await Client.Compute.ExecuteColocatedAsync<string, Poco>(
+                var jobExecution = await 
Client.Compute.ExecuteColocatedAsync<string, Poco>(
                     table.Name, new Poco(1, "foo"), 
Array.Empty<DeploymentUnit>(), ComputeTests.NodeNameJob);
+
+                await jobExecution.GetResultAsync();
+
                 break;
 
             default:
diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs 
b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
index 7e26b024f8..78f7c130b2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs
@@ -129,6 +129,21 @@ namespace Apache.Ignite
         /// <summary>
         /// SQL script (<see cref="ISql.ExecuteScriptAsync"/>).
         /// </summary>
-        SqlExecuteScript
+        SqlExecuteScript,
+
+        /// <summary>
+        /// Get status of a compute job (<see 
cref="IJobExecution{T}.GetStatusAsync"/>).
+        /// </summary>
+        ComputeGetStatus,
+
+        /// <summary>
+        /// Cancel compute job (<see cref="IJobExecution{T}.CancelAsync"/>).
+        /// </summary>
+        ComputeCancel,
+
+        /// <summary>
+        /// Change compute job priority (<see 
cref="IJobExecution{T}.ChangePriorityAsync"/>).
+        /// </summary>
+        ComputeChangePriority
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
index 5175b475cf..d8b52db983 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs
@@ -36,7 +36,7 @@ public interface ICompute
     /// <param name="args">Job arguments.</param>
     /// <typeparam name="T">Job result type.</typeparam>
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-    Task<T> ExecuteAsync<T>(
+    Task<IJobExecution<T>> ExecuteAsync<T>(
         IEnumerable<IClusterNode> nodes,
         IEnumerable<DeploymentUnit> units,
         string jobClassName,
@@ -52,7 +52,7 @@ public interface ICompute
     /// <param name="args">Job arguments.</param>
     /// <typeparam name="T">Job result type.</typeparam>
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-    Task<T> ExecuteColocatedAsync<T>(
+    Task<IJobExecution<T>> ExecuteColocatedAsync<T>(
         string tableName,
         IIgniteTuple key,
         IEnumerable<DeploymentUnit> units,
@@ -70,7 +70,7 @@ public interface ICompute
     /// <typeparam name="T">Job result type.</typeparam>
     /// <typeparam name="TKey">Key type.</typeparam>
     /// <returns>A <see cref="Task"/> representing the asynchronous 
operation.</returns>
-    Task<T> ExecuteColocatedAsync<T, TKey>(
+    Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>(
         string tableName,
         TKey key,
         IEnumerable<DeploymentUnit> units,
@@ -87,7 +87,7 @@ public interface ICompute
     /// <param name="args">Job arguments.</param>
     /// <typeparam name="T">Job result type.</typeparam>
     /// <returns>A map of <see cref="Task"/> representing the asynchronous 
operation for every node.</returns>
-    IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+    IDictionary<IClusterNode, Task<IJobExecution<T>>> BroadcastAsync<T>(
         IEnumerable<IClusterNode> nodes,
         IEnumerable<DeploymentUnit> units,
         string jobClassName,
diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
new file mode 100644
index 0000000000..5d050a1c2a
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Compute;
+
+using System;
+using System.Threading.Tasks;
+
+/// <summary>
+/// Job control object, provides information about the job execution process 
and result, allows cancelling the job.
+/// </summary>
+/// <typeparam name="T">Job result type.</typeparam>
+public interface IJobExecution<T>
+{
+    /// <summary>
+    /// Gets the job ID.
+    /// </summary>
+    Guid Id { get; }
+
+    /// <summary>
+    /// Gets the job execution result.
+    /// </summary>
+    /// <returns>Job execution result.</returns>
+    Task<T> GetResultAsync();
+
+    /// <summary>
+    /// Gets the job execution status. Can be <c>null</c> if the job status no 
longer exists due to exceeding the retention time limit.
+    /// </summary>
+    /// <returns>
+    /// Job execution status. Can be <c>null</c> if the job status no longer 
exists due to exceeding the retention time limit.
+    /// </returns>
+    Task<JobStatus?> GetStatusAsync();
+
+    /// <summary>
+    /// Cancels the job execution.
+    /// </summary>
+    /// <returns>
+    /// Returns <c>true</c> if the job was successfully cancelled, 
<c>false</c> if the job has already finished,
+    /// <c>null</c> if the job was not found (no longer exists due to 
exceeding the retention time limit).
+    /// </returns>
+    Task<bool?> CancelAsync();
+
+    /// <summary>
+    /// Changes the job priority. After priority change the job will be the 
last in the queue of jobs with the same priority.
+    /// </summary>
+    /// <param name="priority">New priority.</param>
+    /// <returns>
+    /// Returns <c>true</c> if the priority was successfully changed,
+    /// <c>false</c> when the priority couldn't be changed (job is already 
executing or completed),
+    /// <c>null</c> if the job was not found (no longer exists due to 
exceeding the retention time limit).
+    /// </returns>
+    Task<bool?> ChangePriorityAsync(int priority);
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java 
b/modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs
similarity index 51%
copy from modules/api/src/main/java/org/apache/ignite/compute/JobState.java
copy to modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs
index 4c7d18a4cc..ada84bf8e6 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs
@@ -15,39 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.compute;
+namespace Apache.Ignite.Compute;
 
-/**
- * Compute job's state enum.
- */
-public enum JobState {
-    /**
-     * The job is submitted and waiting for an execution start.
-     */
-    QUEUED,
+/// <summary>
+/// Compute job state.
+/// </summary>
+public enum JobState
+{
+    /// <summary>
+    /// The job is submitted and waiting for an execution start.
+    /// </summary>
+    Queued,
 
-    /**
-     * The job is being executed.
-     */
-    EXECUTING,
+    /// <summary>
+    /// The job is being executed.
+    /// </summary>
+    Executing,
 
-    /**
-     * The job was unexpectedly terminated during execution.
-     */
-    FAILED,
+    /// <summary>
+    /// The job was unexpectedly terminated during execution.
+    /// </summary>
+    Failed,
 
-    /**
-     * The job was executed successfully and the execution result was returned.
-     */
-    COMPLETED,
+    /// <summary>
+    /// The job was executed successfully and the execution result was 
returned.
+    /// </summary>
+    Completed,
 
-    /**
-     * The job has received the cancel command, but it is still running.
-     */
-    CANCELING,
+    /// <summary>
+    /// The job has received the cancel command, but is still running.
+    /// </summary>
+    Canceling,
 
-    /**
-     * The job was successfully cancelled.
-     */
-    CANCELED;
+    /// <summary>
+    /// The job was successfully cancelled.
+    /// </summary>
+    Canceled
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java 
b/modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs
similarity index 55%
copy from modules/api/src/main/java/org/apache/ignite/compute/JobState.java
copy to modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs
index 4c7d18a4cc..b5bb9232d8 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java
+++ b/modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs
@@ -15,39 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.compute;
+namespace Apache.Ignite.Compute;
 
-/**
- * Compute job's state enum.
- */
-public enum JobState {
-    /**
-     * The job is submitted and waiting for an execution start.
-     */
-    QUEUED,
-
-    /**
-     * The job is being executed.
-     */
-    EXECUTING,
-
-    /**
-     * The job was unexpectedly terminated during execution.
-     */
-    FAILED,
-
-    /**
-     * The job was executed successfully and the execution result was returned.
-     */
-    COMPLETED,
-
-    /**
-     * The job has received the cancel command, but it is still running.
-     */
-    CANCELING,
+using System;
+using NodaTime;
 
-    /**
-     * The job was successfully cancelled.
-     */
-    CANCELED;
-}
+/// <summary>
+/// Compute job status.
+/// </summary>
+/// <param name="Id">Job ID.</param>
+/// <param name="State">State.</param>
+/// <param name="CreateTime">Create time.</param>
+/// <param name="StartTime">Start time (<c>null</c> when not yet 
started).</param>
+/// <param name="FinishTime">Finish time (<c>null</c> when not yet 
finished).</param>
+public sealed record JobStatus(
+    Guid Id,
+    JobState State,
+    Instant CreateTime,
+    Instant? StartTime,
+    Instant? FinishTime);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 5d938590d0..208e4ee04d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -60,7 +60,7 @@ namespace Apache.Ignite.Internal.Compute
         }
 
         /// <inheritdoc/>
-        public async Task<T> ExecuteAsync<T>(
+        public async Task<IJobExecution<T>> ExecuteAsync<T>(
             IEnumerable<IClusterNode> nodes,
             IEnumerable<DeploymentUnit> units,
             string jobClassName,
@@ -76,7 +76,7 @@ namespace Apache.Ignite.Internal.Compute
         }
 
         /// <inheritdoc/>
-        public async Task<T> ExecuteColocatedAsync<T>(
+        public async Task<IJobExecution<T>> ExecuteColocatedAsync<T>(
             string tableName,
             IIgniteTuple key,
             IEnumerable<DeploymentUnit> units,
@@ -92,7 +92,7 @@ namespace Apache.Ignite.Internal.Compute
                 .ConfigureAwait(false);
 
         /// <inheritdoc/>
-        public async Task<T> ExecuteColocatedAsync<T, TKey>(
+        public async Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>(
             string tableName,
             TKey key,
             IEnumerable<DeploymentUnit> units,
@@ -109,7 +109,7 @@ namespace Apache.Ignite.Internal.Compute
                 .ConfigureAwait(false);
 
         /// <inheritdoc/>
-        public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>(
+        public IDictionary<IClusterNode, Task<IJobExecution<T>>> 
BroadcastAsync<T>(
             IEnumerable<IClusterNode> nodes,
             IEnumerable<DeploymentUnit> units,
             string jobClassName,
@@ -119,12 +119,12 @@ namespace Apache.Ignite.Internal.Compute
             IgniteArgumentCheck.NotNull(jobClassName);
             IgniteArgumentCheck.NotNull(units);
 
-            var res = new Dictionary<IClusterNode, Task<T>>();
+            var res = new Dictionary<IClusterNode, Task<IJobExecution<T>>>();
             var units0 = units as ICollection<DeploymentUnit> ?? 
units.ToList(); // Avoid multiple enumeration.
 
             foreach (var node in nodes)
             {
-                var task = ExecuteOnNodes<T>(new[] { node }, units0, 
jobClassName, args);
+                Task<IJobExecution<T>> task = ExecuteOnNodes<T>(new[] { node 
}, units0, jobClassName, args);
 
                 res[node] = task;
             }
@@ -135,6 +135,59 @@ namespace Apache.Ignite.Internal.Compute
         /// <inheritdoc/>
         public override string ToString() => 
IgniteToStringBuilder.Build(GetType());
 
+        /// <summary>
+        /// Gets the job status.
+        /// </summary>
+        /// <param name="jobId">Job ID.</param>
+        /// <returns>Status.</returns>
+        internal async Task<JobStatus?> GetJobStatusAsync(Guid jobId)
+        {
+            using var writer = ProtoCommon.GetMessageWriter();
+            writer.MessageWriter.Write(jobId);
+
+            using var res = await 
_socket.DoOutInOpAsync(ClientOp.ComputeGetStatus, writer).ConfigureAwait(false);
+            return Read(res.GetReader());
+
+            JobStatus? Read(MsgPackReader reader) => reader.TryReadNil() ? 
null : ReadJobStatus(reader);
+        }
+
+        /// <summary>
+        /// Cancels the job.
+        /// </summary>
+        /// <param name="jobId">Job id.</param>
+        /// <returns>
+        /// <c>true</c> when the job is cancelled, <c>false</c> when the job 
couldn't be cancelled
+        /// (either it's not yet started, or it's already completed), or <c> 
null</c> if there's no job with the specified id.
+        /// </returns>
+        internal async Task<bool?> CancelJobAsync(Guid jobId)
+        {
+            using var writer = ProtoCommon.GetMessageWriter();
+            writer.MessageWriter.Write(jobId);
+
+            using var res = await 
_socket.DoOutInOpAsync(ClientOp.ComputeCancel, writer).ConfigureAwait(false);
+            return res.GetReader().ReadBooleanNullable();
+        }
+
+        /// <summary>
+        /// Changes the job priority. After priority change the job will be 
the last in the queue of jobs with the same priority.
+        /// </summary>
+        /// <param name="jobId">Job id.</param>
+        /// <param name="priority">New priority.</param>
+        /// <returns>
+        /// Returns <c>true</c> if the priority was successfully changed,
+        /// <c>false</c> when the priority couldn't be changed (job is already 
executing or completed),
+        /// <c>null</c> if the job was not found (no longer exists due to 
exceeding the retention time limit).
+        /// </returns>
+        internal async Task<bool?> ChangeJobPriorityAsync(Guid jobId, int 
priority)
+        {
+            using var writer = ProtoCommon.GetMessageWriter();
+            writer.MessageWriter.Write(jobId);
+            writer.MessageWriter.Write(priority);
+
+            using var res = await 
_socket.DoOutInOpAsync(ClientOp.ComputeChangePriority, 
writer).ConfigureAwait(false);
+            return res.GetReader().ReadBooleanNullable();
+        }
+
         [SuppressMessage("Security", "CA5394:Do not use insecure randomness", 
Justification = "Secure random is not required here.")]
         private static IClusterNode GetRandomNode(ICollection<IClusterNode> 
nodes)
         {
@@ -198,7 +251,47 @@ namespace Apache.Ignite.Internal.Compute
             });
         }
 
-        private async Task<T> ExecuteOnNodes<T>(
+        private static JobStatus ReadJobStatus(MsgPackReader reader)
+        {
+            var id = reader.ReadGuid();
+            var state = (JobState)reader.ReadInt32();
+            var createTime = reader.ReadInstantNullable();
+            var startTime = reader.ReadInstantNullable();
+            var endTime = reader.ReadInstantNullable();
+
+            return new JobStatus(id, state, createTime.GetValueOrDefault(), 
startTime, endTime);
+        }
+
+        private IJobExecution<T> GetJobExecution<T>(PooledBuffer 
computeExecuteResult, bool readSchema)
+        {
+            var reader = computeExecuteResult.GetReader();
+
+            if (readSchema)
+            {
+                _ = reader.ReadInt32();
+            }
+
+            var jobId = reader.ReadGuid();
+            var resultTask = 
GetResult((NotificationHandler)computeExecuteResult.Metadata!);
+
+            return new JobExecution<T>(jobId, resultTask, this);
+
+            static async Task<(T, JobStatus)> GetResult(NotificationHandler 
handler)
+            {
+                using var notificationRes = await 
handler.Task.ConfigureAwait(false);
+                return Read(notificationRes.GetReader());
+            }
+
+            static (T, JobStatus) Read(MsgPackReader reader)
+            {
+                var res = (T)reader.ReadObjectFromBinaryTuple()!;
+                var status = ReadJobStatus(reader);
+
+                return (res, status);
+            }
+        }
+
+        private async Task<IJobExecution<T>> ExecuteOnNodes<T>(
             ICollection<IClusterNode> nodes,
             IEnumerable<DeploymentUnit> units,
             string jobClassName,
@@ -213,9 +306,7 @@ namespace Apache.Ignite.Internal.Compute
                     ClientOp.ComputeExecute, writer, 
PreferredNode.FromName(node.Name), expectNotifications: true)
                 .ConfigureAwait(false);
 
-            var notificationHandler = (NotificationHandler)res.Metadata!;
-            using var notificationRes = await 
notificationHandler.Task.ConfigureAwait(false);
-            return Read(notificationRes);
+            return GetJobExecution<T>(res, readSchema: false);
 
             void Write()
             {
@@ -231,13 +322,6 @@ namespace Apache.Ignite.Internal.Compute
 
                 w.WriteObjectCollectionAsBinaryTuple(args);
             }
-
-            static T Read(in PooledBuffer buf)
-            {
-                var reader = buf.GetReader();
-
-                return (T)reader.ReadObjectFromBinaryTuple()!;
-            }
         }
 
         private async Task<Table> GetTableAsync(string tableName)
@@ -261,7 +345,7 @@ namespace Apache.Ignite.Internal.Compute
         }
 
         [SuppressMessage("Maintainability", "CA1508:Avoid dead conditional 
code", Justification = "False positive")]
-        private async Task<T> ExecuteColocatedAsync<T, TKey>(
+        private async Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>(
             string tableName,
             TKey key,
             Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc,
@@ -292,9 +376,7 @@ namespace Apache.Ignite.Internal.Compute
                             ClientOp.ComputeExecuteColocated, bufferWriter, 
preferredNode, expectNotifications: true)
                         .ConfigureAwait(false);
 
-                    var notificationHandler = 
(NotificationHandler)res.Metadata!;
-                    using var notificationRes = await 
notificationHandler.Task.ConfigureAwait(false);
-                    return Read(notificationRes);
+                    return GetJobExecution<T>(res, readSchema: true);
                 }
                 catch (IgniteException e) when (e.Code == 
ErrorGroups.Client.TableIdNotFound)
                 {
@@ -336,11 +418,6 @@ namespace Apache.Ignite.Internal.Compute
 
                 return colocationHash;
             }
-
-            static T Read(in PooledBuffer buf)
-            {
-                return (T)buf.GetReader().ReadObjectFromBinaryTuple()!;
-            }
         }
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
new file mode 100644
index 0000000000..441f7068cd
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Internal.Compute;
+
+using System;
+using System.Threading.Tasks;
+using Ignite.Compute;
+
+/// <summary>
+/// Job execution.
+/// </summary>
+/// <typeparam name="T">Job result type.</typeparam>
+internal sealed record JobExecution<T> : IJobExecution<T>
+{
+    private readonly Task<(T Result, JobStatus Status)> _resultTask;
+
+    private readonly Compute _compute;
+
+    private volatile JobStatus? _finalStatus;
+
+    /// <summary>
+    /// Initializes a new instance of the <see cref="JobExecution{T}"/> class.
+    /// </summary>
+    /// <param name="id">Job id.</param>
+    /// <param name="resultTask">Result task.</param>
+    /// <param name="compute">Compute.</param>
+    public JobExecution(Guid id, Task<(T Result, JobStatus Status)> 
resultTask, Compute compute)
+    {
+        Id = id;
+        _resultTask = resultTask;
+        _compute = compute;
+
+        // Wait for completion in background and cache the status.
+        _ = CacheStatusOnCompletion();
+    }
+
+    /// <inheritdoc/>
+    public Guid Id { get; }
+
+    /// <inheritdoc/>
+    public async Task<T> GetResultAsync()
+    {
+        var (result, _) = await _resultTask.ConfigureAwait(false);
+        return result;
+    }
+
+    /// <inheritdoc/>
+    public async Task<JobStatus?> GetStatusAsync()
+    {
+        var finalStatus = _finalStatus;
+        if (finalStatus != null)
+        {
+            return finalStatus;
+        }
+
+        var status = await 
_compute.GetJobStatusAsync(Id).ConfigureAwait(false);
+        if (status is { State: JobState.Completed or JobState.Failed or 
JobState.Canceled })
+        {
+            // Can't be transitioned to another state, cache it.
+            _finalStatus = status;
+        }
+
+        return status;
+    }
+
+    /// <inheritdoc/>
+    public async Task<bool?> CancelAsync() =>
+        await _compute.CancelJobAsync(Id).ConfigureAwait(false);
+
+    /// <inheritdoc/>
+    public async Task<bool?> ChangePriorityAsync(int priority) =>
+        await _compute.ChangeJobPriorityAsync(Id, 
priority).ConfigureAwait(false);
+
+    private async Task CacheStatusOnCompletion()
+    {
+        var (_, status) = await _resultTask.ConfigureAwait(false);
+
+        _finalStatus = status;
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
index 213990c209..80b3cecdfd 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs
@@ -119,6 +119,15 @@ namespace Apache.Ignite.Internal.Proto
         SqlExecScript = 56,
 
         /** SQL parameter metadata. */
-        SqlParamMeta = 57
+        SqlParamMeta = 57,
+
+        /** Get compute job status. */
+        ComputeGetStatus = 59,
+
+        /** Cancel compute job. */
+        ComputeCancel = 60,
+
+        /** Change compute job priority. */
+        ComputeChangePriority = 61
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
index 71c0803cf8..abab50c462 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs
@@ -54,6 +54,9 @@ namespace Apache.Ignite.Internal.Proto
                 ClientOp.TupleContainsKey => 
ClientOperationType.TupleContainsKey,
                 ClientOp.ComputeExecute => ClientOperationType.ComputeExecute,
                 ClientOp.ComputeExecuteColocated => 
ClientOperationType.ComputeExecute,
+                ClientOp.ComputeGetStatus => 
ClientOperationType.ComputeGetStatus,
+                ClientOp.ComputeCancel => ClientOperationType.ComputeCancel,
+                ClientOp.ComputeChangePriority => 
ClientOperationType.ComputeChangePriority,
                 ClientOp.SqlExec => ClientOperationType.SqlExecute,
                 ClientOp.SqlExecScript => ClientOperationType.SqlExecuteScript,
                 ClientOp.SqlCursorNextPage => null,
diff --git 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
index e86f3defa1..30357ca7bc 100644
--- 
a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
+++ 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs
@@ -22,6 +22,7 @@ using System.Buffers.Binary;
 using System.IO;
 using BinaryTuple;
 using Ignite.Sql;
+using NodaTime;
 
 /// <summary>
 /// MsgPack reader.
@@ -80,6 +81,12 @@ internal ref struct MsgPackReader
             var invalid => throw GetInvalidCodeException("bool", invalid)
         };
 
+    /// <summary>
+    /// Reads a nullable boolean value.
+    /// </summary>
+    /// <returns>The value.</returns>
+    public bool ReadBooleanNullable() => TryReadNil() ? default : 
ReadBoolean();
+
     /// <summary>
     /// Reads a short value.
     /// </summary>
@@ -214,6 +221,14 @@ internal ref struct MsgPackReader
         return UuidSerializer.Read(GetSpan(16));
     }
 
+    /// <summary>
+    /// Reads Instant value.
+    /// </summary>
+    /// <returns>Instant.</returns>
+    public Instant? ReadInstantNullable() => TryReadNil()
+        ? null
+        : 
Instant.FromUnixTimeSeconds(ReadInt64()).PlusNanoseconds(ReadInt32());
+
     /// <summary>
     /// Skips a value.
     /// </summary>
diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs 
b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
index 925f04afb2..41dcbcfa53 100644
--- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs
@@ -58,6 +58,9 @@ namespace Apache.Ignite
                 ClientOperationType.ComputeExecute => false,
                 ClientOperationType.SqlExecute => false,
                 ClientOperationType.SqlExecuteScript => false,
+                ClientOperationType.ComputeCancel => false,
+                ClientOperationType.ComputeChangePriority => false,
+                ClientOperationType.ComputeGetStatus => true,
                 var unsupported => throw new 
NotSupportedException("Unsupported operation type: " + unsupported)
             };
         }

Reply via email to