This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new f2c02191d0 IGNITE-21289 .NET: Implement job execution interface (#3159) f2c02191d0 is described below commit f2c02191d03505d592e8e76aa1b6494568668119 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Wed Feb 7 11:30:52 2024 +0200 IGNITE-21289 .NET: Implement job execution interface (#3159) Add job execution API: * Return `IJobExecution<T>` from `ICompute` methods instead of a raw result * Implement `GetStatusAsync`, `CancelAsync`, `ChangePriorityAsync` --- .../java/org/apache/ignite/compute/JobState.java | 2 +- .../java/org/apache/ignite/compute/JobStatus.java | 73 +++++++++ .../Compute/ComputeClusterAwarenessTests.cs | 21 +-- .../Apache.Ignite.Tests/Compute/ComputeTests.cs | 173 ++++++++++++++++----- .../dotnet/Apache.Ignite.Tests/FakeServer.cs | 22 ++- .../PartitionAwarenessRealClusterTests.cs | 4 +- .../Proto/ColocationHashTests.cs | 8 +- .../Table/SchemaSynchronizationTest.cs | 5 +- .../dotnet/Apache.Ignite/ClientOperationType.cs | 17 +- .../dotnet/Apache.Ignite/Compute/ICompute.cs | 8 +- .../dotnet/Apache.Ignite/Compute/IJobExecution.cs | 67 ++++++++ .../dotnet/Apache.Ignite/Compute/JobState.cs} | 59 +++---- .../dotnet/Apache.Ignite/Compute/JobStatus.cs} | 51 ++---- .../Apache.Ignite/Internal/Compute/Compute.cs | 129 +++++++++++---- .../Apache.Ignite/Internal/Compute/JobExecution.cs | 95 +++++++++++ .../Apache.Ignite/Internal/Proto/ClientOp.cs | 11 +- .../Internal/Proto/ClientOpExtensions.cs | 3 + .../Internal/Proto/MsgPack/MsgPackReader.cs | 15 ++ .../dotnet/Apache.Ignite/RetryReadPolicy.cs | 3 + 19 files changed, 618 insertions(+), 148 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java index 4c7d18a4cc..a60c025217 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/JobState.java @@ -42,7 +42,7 @@ public enum JobState { COMPLETED, /** - * The job has received the cancel command, but it is still running. + * The job has received the cancel command, but is still running. */ CANCELING, diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java index 3ee4ac2910..e04f27c74e 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/JobStatus.java @@ -64,32 +64,67 @@ public class JobStatus implements Serializable { this.finishTime = builder.finishTime; } + /** + * Creates a new builder. + * + * @return Builder. + */ public static Builder builder() { return new Builder(); } + /** + * Returns job ID. + * + * @return Job ID. + */ public UUID id() { return id; } + /** + * Returns job state. + * + * @return Job state. + */ public JobState state() { return state; } + /** + * Returns job create time. + * + * @return Job create time. + */ public Instant createTime() { return createTime; } + /** + * Returns job start time. {@code null} if the job has not started yet. + * + * @return Job start time. {@code null} if the job has not started yet. + */ @Nullable public Instant startTime() { return startTime; } + /** + * Returns job finish time. {@code null} if the job has not finished yet. + * + * @return Job finish time. {@code null} if the job has not finished yet. + */ @Nullable public Instant finishTime() { return finishTime; } + /** + * Returns a new builder with the same property values as this JobStatus. + * + * @return Builder. + */ public Builder toBuilder() { return new Builder(this); } @@ -106,6 +141,9 @@ public class JobStatus implements Serializable { @Nullable private Instant finishTime; + /** + * Constructor. + */ public Builder() { } @@ -117,31 +155,66 @@ public class JobStatus implements Serializable { this.finishTime = status.finishTime; } + /** + * Sets job ID. + * + * @param id Job ID. + * @return This builder. + */ public Builder id(UUID id) { this.id = id; return this; } + /** + * Sets job state. + * + * @param state Job state. + * @return This builder. + */ public Builder state(JobState state) { this.state = state; return this; } + /** + * Sets job create time. + * + * @param createTime Job create time. + * @return This builder. + */ public Builder createTime(Instant createTime) { this.createTime = createTime; return this; } + /** + * Sets job start time. + * + * @param startTime Job start time. + * @return This builder. + */ public Builder startTime(@Nullable Instant startTime) { this.startTime = startTime; return this; } + /** + * Sets job finish time. + * + * @param finishTime Job finish time. + * @return This builder. + */ public Builder finishTime(@Nullable Instant finishTime) { this.finishTime = finishTime; return this; } + /** + * Builds a new JobStatus. + * + * @return JobStatus. + */ public JobStatus build() { return new JobStatus(this); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs index 7b1fede7cc..25ac8b6c99 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeClusterAwarenessTests.cs @@ -45,14 +45,14 @@ namespace Apache.Ignite.Tests.Compute using var client = await IgniteClient.StartAsync(clientCfg); client.WaitForConnections(3); - var res2 = await client.Compute.ExecuteAsync<string>( + IJobExecution<string> exec2 = await client.Compute.ExecuteAsync<string>( new[] { server2.Node }, Array.Empty<DeploymentUnit>(), jobClassName: string.Empty); - var res3 = await client.Compute.ExecuteAsync<string>( + IJobExecution<string> exec3 = await client.Compute.ExecuteAsync<string>( new[] { server3.Node }, Array.Empty<DeploymentUnit>(), jobClassName: string.Empty); - Assert.AreEqual("s2", res2); - Assert.AreEqual("s3", res3); + Assert.AreEqual("s2", await exec2.GetResultAsync()); + Assert.AreEqual("s3", await exec3.GetResultAsync()); Assert.AreEqual(ClientOp.ComputeExecute, server2.ClientOps.Single()); Assert.AreEqual(ClientOp.ComputeExecute, server3.ClientOps.Single()); @@ -69,14 +69,14 @@ namespace Apache.Ignite.Tests.Compute using var client = await server1.ConnectClientAsync(); - var res2 = await client.Compute.ExecuteAsync<string>( + IJobExecution<string> exec2 = await client.Compute.ExecuteAsync<string>( new[] { server2.Node }, Array.Empty<DeploymentUnit>(), jobClassName: string.Empty); - var res3 = await client.Compute.ExecuteAsync<string>( + IJobExecution<string> exec3 = await client.Compute.ExecuteAsync<string>( new[] { server3.Node }, Array.Empty<DeploymentUnit>(), jobClassName: string.Empty); - Assert.AreEqual("s1", res2); - Assert.AreEqual("s1", res3); + Assert.AreEqual("s1", await exec2.GetResultAsync()); + Assert.AreEqual("s1", await exec3.GetResultAsync()); Assert.AreEqual(new[] { ClientOp.ComputeExecute, ClientOp.ComputeExecute }, server1.ClientOps); Assert.IsEmpty(server2.ClientOps); @@ -105,9 +105,12 @@ namespace Apache.Ignite.Tests.Compute for (int i = 0; i < 100; i++) { var node = i % 2 == 0 ? server1.Node : server2.Node; - var res = await client.Compute.ExecuteAsync<string>( + + IJobExecution<string> jobExecution = await client.Compute.ExecuteAsync<string>( new[] { node }, Array.Empty<DeploymentUnit>(), jobClassName: string.Empty); + string res = await jobExecution.GetResultAsync(); + nodeNames.Add(res); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs index 145357c8b7..838265a57d 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs @@ -26,6 +26,7 @@ namespace Apache.Ignite.Tests.Compute using System.Threading.Tasks; using Ignite.Compute; using Ignite.Table; + using Internal.Compute; using Internal.Network; using Internal.Proto; using Network; @@ -86,14 +87,15 @@ namespace Apache.Ignite.Tests.Compute var res1 = await Client.Compute.ExecuteAsync<string>(await GetNodeAsync(0), Units, NodeNameJob, "-", 11); var res2 = await Client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, NodeNameJob, ":", 22); - Assert.AreEqual(PlatformTestNodeRunner + "-_11", res1); - Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", res2); + Assert.AreEqual(PlatformTestNodeRunner + "-_11", await res1.GetResultAsync()); + Assert.AreEqual(PlatformTestNodeRunner + "_2:_22", await res2.GetResultAsync()); } [Test] public async Task TestExecuteOnRandomNode() { - var res = await Client.Compute.ExecuteAsync<string>(await Client.GetClusterNodesAsync(), Units, NodeNameJob); + var jobExecution = await Client.Compute.ExecuteAsync<string>(await Client.GetClusterNodesAsync(), Units, NodeNameJob); + var res = await jobExecution.GetResultAsync(); var expectedNodeNames = Enumerable.Range(1, 4) .Select(x => x == 1 ? PlatformTestNodeRunner : PlatformTestNodeRunner + "_" + x) @@ -103,10 +105,10 @@ namespace Apache.Ignite.Tests.Compute } [Test] - public void TestExecuteResultTypeMismatchThrowsInvalidCastException() + public async Task TestExecuteResultTypeMismatchThrowsInvalidCastException() { - Assert.ThrowsAsync<InvalidCastException>(async () => - await Client.Compute.ExecuteAsync<Guid>(await Client.GetClusterNodesAsync(), Units, NodeNameJob)); + var jobExecution = await Client.Compute.ExecuteAsync<Guid>(await Client.GetClusterNodesAsync(), Units, NodeNameJob); + Assert.ThrowsAsync<InvalidCastException>(async () => await jobExecution.GetResultAsync()); } [Test] @@ -114,13 +116,13 @@ namespace Apache.Ignite.Tests.Compute { var nodes = await GetNodeAsync(0); - IDictionary<IClusterNode, Task<string>> taskMap = Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123"); + IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap = Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123"); var res = await taskMap[nodes[0]]; Assert.AreEqual(1, taskMap.Count); Assert.AreSame(nodes[0], taskMap.Keys.Single()); - Assert.AreEqual(PlatformTestNodeRunner + "123", res); + Assert.AreEqual(PlatformTestNodeRunner + "123", await res.GetResultAsync()); } [Test] @@ -128,7 +130,7 @@ namespace Apache.Ignite.Tests.Compute { var nodes = await Client.GetClusterNodesAsync(); - IDictionary<IClusterNode, Task<string>> taskMap = Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123"); + IDictionary<IClusterNode, Task<IJobExecution<string>>> taskMap = Client.Compute.BroadcastAsync<string>(nodes, Units, NodeNameJob, "123"); var res1 = await taskMap[nodes[0]]; var res2 = await taskMap[nodes[1]]; var res3 = await taskMap[nodes[2]]; @@ -136,10 +138,10 @@ namespace Apache.Ignite.Tests.Compute Assert.AreEqual(4, taskMap.Count); - Assert.AreEqual(nodes[0].Name + "123", res1); - Assert.AreEqual(nodes[1].Name + "123", res2); - Assert.AreEqual(nodes[2].Name + "123", res3); - Assert.AreEqual(nodes[3].Name + "123", res4); + Assert.AreEqual(nodes[0].Name + "123", await res1.GetResultAsync()); + Assert.AreEqual(nodes[1].Name + "123", await res2.GetResultAsync()); + Assert.AreEqual(nodes[2].Name + "123", await res3.GetResultAsync()); + Assert.AreEqual(nodes[3].Name + "123", await res4.GetResultAsync()); } [Test] @@ -147,7 +149,7 @@ namespace Apache.Ignite.Tests.Compute { var res = await Client.Compute.ExecuteAsync<string>(await Client.GetClusterNodesAsync(), Units, ConcatJob, 1.1, Guid.Empty, "3", null); - Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null", res); + Assert.AreEqual("1.1_00000000-0000-0000-0000-000000000000_3_null", await res.GetResultAsync()); } [Test] @@ -155,14 +157,14 @@ namespace Apache.Ignite.Tests.Compute { var res = await Client.Compute.ExecuteAsync<string>(await Client.GetClusterNodesAsync(), Units, ConcatJob, args: null); - Assert.IsNull(res); + Assert.IsNull(await res.GetResultAsync()); } [Test] - public void TestJobErrorPropagatesToClientWithClassAndMessage() + public async Task TestJobErrorPropagatesToClientWithClassAndMessage() { - var ex = Assert.ThrowsAsync<IgniteException>(async () => - await Client.Compute.ExecuteAsync<string>(await Client.GetClusterNodesAsync(), Units, ErrorJob, "unused")); + var jobExecution = await Client.Compute.ExecuteAsync<string>(await Client.GetClusterNodesAsync(), Units, ErrorJob, "unused"); + var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExecution.GetResultAsync()); StringAssert.Contains("Custom job error", ex!.Message); @@ -230,7 +232,8 @@ namespace Apache.Ignite.Tests.Compute { var nodes = await Client.GetClusterNodesAsync(); var str = expectedStr ?? val.ToString()!.Replace("E+", "E"); - var res = await Client.Compute.ExecuteAsync<object>(nodes, Units, EchoJob, val, str); + IJobExecution<object> resExec = await Client.Compute.ExecuteAsync<object>(nodes, Units, EchoJob, val, str); + object res = await resExec.GetResultAsync(); Assert.AreEqual(val, res); } @@ -268,9 +271,9 @@ namespace Apache.Ignite.Tests.Compute var nodeName = nodeIdx == 1 ? string.Empty : "_" + nodeIdx; var expectedNodeName = PlatformTestNodeRunner + nodeName; - Assert.AreEqual(expectedNodeName, resNodeName); - Assert.AreEqual(expectedNodeName, resNodeName2); - Assert.AreEqual(expectedNodeName, resNodeName3); + Assert.AreEqual(expectedNodeName, await resNodeName.GetResultAsync()); + Assert.AreEqual(expectedNodeName, await resNodeName2.GetResultAsync()); + Assert.AreEqual(expectedNodeName, await resNodeName3.GetResultAsync()); // We only connect to 2 of 4 nodes because of different auth settings. if (nodeIdx < 3) @@ -306,17 +309,22 @@ namespace Apache.Ignite.Tests.Compute { // Create table and use it in ExecuteColocated. var nodes = await GetNodeAsync(0); - var tableName = await Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, "drop_me"); + var tableNameExec = await Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, "drop_me"); + var tableName = await tableNameExec.GetResultAsync(); try { var keyTuple = new IgniteTuple { [KeyCol] = 1L }; - var resNodeName = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, NodeNameJob); + var resNodeNameExec = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, NodeNameJob); + var resNodeName = await resNodeNameExec.GetResultAsync(); // Drop table and create a new one with a different ID, then execute a computation again. // This should update the cached table and complete the computation successfully. - await Client.Compute.ExecuteAsync<string>(nodes, Units, DropTableJob, tableName); - await Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, tableName); + var dropExec = await Client.Compute.ExecuteAsync<string>(nodes, Units, DropTableJob, tableName); + await dropExec.GetResultAsync(); + + var createExec = await Client.Compute.ExecuteAsync<string>(nodes, Units, CreateTableJob, tableName); + await createExec.GetResultAsync(); if (forceLoadAssignment) { @@ -324,21 +332,22 @@ namespace Apache.Ignite.Tests.Compute table.SetFieldValue("_partitionAssignment", null); } - var resNodeName2 = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, NodeNameJob); + var resNodeName2Exec = await Client.Compute.ExecuteColocatedAsync<string>(tableName, keyTuple, Units, NodeNameJob); + var resNodeName2 = await resNodeName2Exec.GetResultAsync(); Assert.AreEqual(resNodeName, resNodeName2); } finally { - await Client.Compute.ExecuteAsync<string>(nodes, Units, DropTableJob, tableName); + await (await Client.Compute.ExecuteAsync<string>(nodes, Units, DropTableJob, tableName)).GetResultAsync(); } } [Test] - public void TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace() + public async Task TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace() { - var ex = Assert.ThrowsAsync<IgniteException>(async () => - await Client.Compute.ExecuteAsync<object>(await GetNodeAsync(1), Units, ExceptionJob, "foo-bar")); + var jobExecution = await Client.Compute.ExecuteAsync<object>(await GetNodeAsync(1), Units, ExceptionJob, "foo-bar"); + var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExecution.GetResultAsync()); Assert.AreEqual("Test exception: foo-bar", ex!.Message); Assert.IsNotNull(ex.InnerException); @@ -353,10 +362,10 @@ namespace Apache.Ignite.Tests.Compute } [Test] - public void TestCheckedExceptionInJobPropagatesToClient() + public async Task TestCheckedExceptionInJobPropagatesToClient() { - var ex = Assert.ThrowsAsync<IgniteException>(async () => - await Client.Compute.ExecuteAsync<object>(await GetNodeAsync(1), Units, CheckedExceptionJob, "foo-bar")); + var jobExecution = await Client.Compute.ExecuteAsync<object>(await GetNodeAsync(1), Units, CheckedExceptionJob, "foo-bar"); + var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExecution.GetResultAsync()); Assert.AreEqual("TestCheckedEx: foo-bar", ex!.Message); Assert.IsNotNull(ex.InnerException); @@ -377,18 +386,18 @@ namespace Apache.Ignite.Tests.Compute using var client = await server.ConnectClientAsync(); var res = await client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), units, FakeServer.GetDetailsJob); - StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", res); + StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", await res.GetResultAsync()); // Lazy enumerable. var res2 = await client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), units.Reverse(), FakeServer.GetDetailsJob); - StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest", res2); + StringAssert.Contains("Units = unit1|1.0.0, unit-latest|latest", await res2.GetResultAsync()); // Colocated. var keyTuple = new IgniteTuple { ["ID"] = 1 }; var res3 = await client.Compute.ExecuteColocatedAsync<string>( FakeServer.ExistingTableName, keyTuple, units, FakeServer.GetDetailsJob); - StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", res3); + StringAssert.Contains("Units = unit-latest|latest, unit1|1.0.0", await res3.GetResultAsync()); } [Test] @@ -466,7 +475,8 @@ namespace Apache.Ignite.Tests.Compute using var client = await IgniteClient.StartAsync(GetConfig()); const int sleepMs = 3000; - var jobTask = client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, SleepJob, sleepMs); + var jobExecution = await client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, SleepJob, sleepMs); + var jobTask = jobExecution.GetResultAsync(); // Wait a bit and close the connection. await Task.Delay(10); @@ -476,6 +486,93 @@ namespace Apache.Ignite.Tests.Compute Assert.AreEqual("Connection closed.", ex!.Message); } + [Test] + public async Task TestJobExecutionStatusExecuting() + { + const int sleepMs = 3000; + var beforeStart = SystemClock.Instance.GetCurrentInstant(); + + var jobExecution = await Client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, SleepJob, sleepMs); + + await AssertJobStatus(jobExecution, JobState.Executing, beforeStart); + } + + [Test] + public async Task TestJobExecutionStatusCompleted() + { + const int sleepMs = 1; + var beforeStart = SystemClock.Instance.GetCurrentInstant(); + + var jobExecution = await Client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, SleepJob, sleepMs); + await jobExecution.GetResultAsync(); + + await AssertJobStatus(jobExecution, JobState.Completed, beforeStart); + } + + [Test] + public async Task TestJobExecutionStatusFailed() + { + var beforeStart = SystemClock.Instance.GetCurrentInstant(); + + var jobExecution = await Client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, ErrorJob, "unused"); + Assert.CatchAsync(async () => await jobExecution.GetResultAsync()); + + await AssertJobStatus(jobExecution, JobState.Failed, beforeStart); + } + + [Test] + public async Task TestJobExecutionStatusNull() + { + var fakeJobExecution = new JobExecution<int>( + Guid.NewGuid(), Task.FromException<(int, JobStatus)>(new Exception("x")), (Compute)Client.Compute); + + var status = await fakeJobExecution.GetStatusAsync(); + + Assert.IsNull(status); + } + + [Test] + public async Task TestJobExecutionCancel() + { + const int sleepMs = 5000; + var beforeStart = SystemClock.Instance.GetCurrentInstant(); + + var jobExecution = await Client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, SleepJob, sleepMs); + await jobExecution.CancelAsync(); + + await AssertJobStatus(jobExecution, JobState.Canceled, beforeStart); + } + + [Test] + public async Task TestChangePriority() + { + var jobExecution = await Client.Compute.ExecuteAsync<string>(await GetNodeAsync(1), Units, SleepJob, 5000); + var res = await jobExecution.ChangePriorityAsync(123); + + // Job exists, but is already executing. + Assert.IsFalse(res); + } + + private static async Task AssertJobStatus<T>(IJobExecution<T> jobExecution, JobState state, Instant beforeStart) + { + JobStatus? status = await jobExecution.GetStatusAsync(); + + Assert.IsNotNull(status); + Assert.AreEqual(jobExecution.Id, status!.Id); + Assert.AreEqual(state, status.State); + Assert.Greater(status.CreateTime, beforeStart); + Assert.Greater(status.StartTime, status.CreateTime); + + if (state is JobState.Canceled or JobState.Completed or JobState.Failed) + { + Assert.Greater(status.FinishTime, status.StartTime); + } + else + { + Assert.IsNull(status.FinishTime); + } + } + private async Task<List<IClusterNode>> GetNodeAsync(int index) => (await Client.GetClusterNodesAsync()).OrderBy(n => n.Name).Skip(index).Take(1).ToList(); } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs index 493a6ab897..dc1a974633 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs @@ -299,7 +299,19 @@ namespace Apache.Ignite.Tests case ClientOp.ComputeExecuteColocated: { using var pooledArrayBuffer = ComputeExecute(reader, colocated: opCode == ClientOp.ComputeExecuteColocated); - Send(handler, requestId, ReadOnlyMemory<byte>.Empty); + + using var resWriter = new PooledArrayBuffer(); + + var rw = resWriter.MessageWriter; + if (opCode == ClientOp.ComputeExecuteColocated) + { + // Schema version. + rw.Write(1); + } + + rw.Write(Guid.NewGuid()); + + Send(handler, requestId, resWriter); Send(handler, requestId, pooledArrayBuffer, isNotification: true); continue; } @@ -656,6 +668,14 @@ namespace Apache.Ignite.Tests writer.Write(builder.Build().Span); + // Status + writer.Write(Guid.NewGuid()); + writer.Write(0); // State. + writer.Write(0L); // Create time. + writer.Write(0); + writer.WriteNil(); // Start time. + writer.WriteNil(); // Finish time. + return arrayBufferWriter; } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs index cba89ada6a..2c24bb3ebb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/PartitionAwarenessRealClusterTests.cs @@ -50,12 +50,14 @@ public class PartitionAwarenessRealClusterTests : IgniteTestsBase { var keyTuple = new IgniteTuple { ["KEY"] = key }; - var primaryNodeName = await client.Compute.ExecuteColocatedAsync<string>( + var primaryNodeNameExec = await client.Compute.ExecuteColocatedAsync<string>( TableName, keyTuple, Array.Empty<DeploymentUnit>(), ComputeTests.NodeNameJob); + var primaryNodeName = await primaryNodeNameExec.GetResultAsync(); + if (primaryNodeName.EndsWith("_3", StringComparison.Ordinal) || primaryNodeName.EndsWith("_4", StringComparison.Ordinal)) { // Skip nodes without direct client connection. diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs index 4539084c7c..3e0d6282b4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Proto/ColocationHashTests.cs @@ -194,13 +194,15 @@ public class ColocationHashTests : IgniteTestsBase using var writer = ProtoCommon.GetMessageWriter(); var clientColocationHash = ser.Write(writer, null, schema, key); - var serverColocationHash = await Client.Compute.ExecuteAsync<int>( + var serverColocationHashExec = await Client.Compute.ExecuteAsync<int>( clusterNodes, Array.Empty<DeploymentUnit>(), TableRowColocationHashJob, tableName, i); + var serverColocationHash = await serverColocationHashExec.GetResultAsync(); + Assert.AreEqual(serverColocationHash, clientColocationHash, key.ToString()); } } @@ -327,7 +329,7 @@ public class ColocationHashTests : IgniteTestsBase { var nodes = await Client.GetClusterNodesAsync(); - return await Client.Compute.ExecuteAsync<int>( + IJobExecution<int> jobExecution = await Client.Compute.ExecuteAsync<int>( nodes, Array.Empty<DeploymentUnit>(), ColocationHashJob, @@ -335,6 +337,8 @@ public class ColocationHashTests : IgniteTestsBase bytes, timePrecision, timestampPrecision); + + return await jobExecution.GetResultAsync(); } private record TestIndexProvider(Func<int, int> ColumnOrderDelegate, int HashedColumnCount) : IHashedColumnIndexProvider diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs index b5ff24bccf..f164946556 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Table/SchemaSynchronizationTest.cs @@ -292,8 +292,11 @@ public class SchemaSynchronizationTest : IgniteTestsBase break; case TestMode.Compute: - await Client.Compute.ExecuteColocatedAsync<string, Poco>( + var jobExecution = await Client.Compute.ExecuteColocatedAsync<string, Poco>( table.Name, new Poco(1, "foo"), Array.Empty<DeploymentUnit>(), ComputeTests.NodeNameJob); + + await jobExecution.GetResultAsync(); + break; default: diff --git a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs index 7e26b024f8..78f7c130b2 100644 --- a/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs +++ b/modules/platforms/dotnet/Apache.Ignite/ClientOperationType.cs @@ -129,6 +129,21 @@ namespace Apache.Ignite /// <summary> /// SQL script (<see cref="ISql.ExecuteScriptAsync"/>). /// </summary> - SqlExecuteScript + SqlExecuteScript, + + /// <summary> + /// Get status of a compute job (<see cref="IJobExecution{T}.GetStatusAsync"/>). + /// </summary> + ComputeGetStatus, + + /// <summary> + /// Cancel compute job (<see cref="IJobExecution{T}.CancelAsync"/>). + /// </summary> + ComputeCancel, + + /// <summary> + /// Change compute job priority (<see cref="IJobExecution{T}.ChangePriorityAsync"/>). + /// </summary> + ComputeChangePriority } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs index 5175b475cf..d8b52db983 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Compute/ICompute.cs @@ -36,7 +36,7 @@ public interface ICompute /// <param name="args">Job arguments.</param> /// <typeparam name="T">Job result type.</typeparam> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> - Task<T> ExecuteAsync<T>( + Task<IJobExecution<T>> ExecuteAsync<T>( IEnumerable<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, @@ -52,7 +52,7 @@ public interface ICompute /// <param name="args">Job arguments.</param> /// <typeparam name="T">Job result type.</typeparam> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> - Task<T> ExecuteColocatedAsync<T>( + Task<IJobExecution<T>> ExecuteColocatedAsync<T>( string tableName, IIgniteTuple key, IEnumerable<DeploymentUnit> units, @@ -70,7 +70,7 @@ public interface ICompute /// <typeparam name="T">Job result type.</typeparam> /// <typeparam name="TKey">Key type.</typeparam> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> - Task<T> ExecuteColocatedAsync<T, TKey>( + Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>( string tableName, TKey key, IEnumerable<DeploymentUnit> units, @@ -87,7 +87,7 @@ public interface ICompute /// <param name="args">Job arguments.</param> /// <typeparam name="T">Job result type.</typeparam> /// <returns>A map of <see cref="Task"/> representing the asynchronous operation for every node.</returns> - IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>( + IDictionary<IClusterNode, Task<IJobExecution<T>>> BroadcastAsync<T>( IEnumerable<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, diff --git a/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs new file mode 100644 index 0000000000..5d050a1c2a --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite/Compute/IJobExecution.cs @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Compute; + +using System; +using System.Threading.Tasks; + +/// <summary> +/// Job control object, provides information about the job execution process and result, allows cancelling the job. +/// </summary> +/// <typeparam name="T">Job result type.</typeparam> +public interface IJobExecution<T> +{ + /// <summary> + /// Gets the job ID. + /// </summary> + Guid Id { get; } + + /// <summary> + /// Gets the job execution result. + /// </summary> + /// <returns>Job execution result.</returns> + Task<T> GetResultAsync(); + + /// <summary> + /// Gets the job execution status. Can be <c>null</c> if the job status no longer exists due to exceeding the retention time limit. + /// </summary> + /// <returns> + /// Job execution status. Can be <c>null</c> if the job status no longer exists due to exceeding the retention time limit. + /// </returns> + Task<JobStatus?> GetStatusAsync(); + + /// <summary> + /// Cancels the job execution. + /// </summary> + /// <returns> + /// Returns <c>true</c> if the job was successfully cancelled, <c>false</c> if the job has already finished, + /// <c>null</c> if the job was not found (no longer exists due to exceeding the retention time limit). + /// </returns> + Task<bool?> CancelAsync(); + + /// <summary> + /// Changes the job priority. After priority change the job will be the last in the queue of jobs with the same priority. + /// </summary> + /// <param name="priority">New priority.</param> + /// <returns> + /// Returns <c>true</c> if the priority was successfully changed, + /// <c>false</c> when the priority couldn't be changed (job is already executing or completed), + /// <c>null</c> if the job was not found (no longer exists due to exceeding the retention time limit). + /// </returns> + Task<bool?> ChangePriorityAsync(int priority); +} diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java b/modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs similarity index 51% copy from modules/api/src/main/java/org/apache/ignite/compute/JobState.java copy to modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs index 4c7d18a4cc..ada84bf8e6 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java +++ b/modules/platforms/dotnet/Apache.Ignite/Compute/JobState.cs @@ -15,39 +15,40 @@ * limitations under the License. */ -package org.apache.ignite.compute; +namespace Apache.Ignite.Compute; -/** - * Compute job's state enum. - */ -public enum JobState { - /** - * The job is submitted and waiting for an execution start. - */ - QUEUED, +/// <summary> +/// Compute job state. +/// </summary> +public enum JobState +{ + /// <summary> + /// The job is submitted and waiting for an execution start. + /// </summary> + Queued, - /** - * The job is being executed. - */ - EXECUTING, + /// <summary> + /// The job is being executed. + /// </summary> + Executing, - /** - * The job was unexpectedly terminated during execution. - */ - FAILED, + /// <summary> + /// The job was unexpectedly terminated during execution. + /// </summary> + Failed, - /** - * The job was executed successfully and the execution result was returned. - */ - COMPLETED, + /// <summary> + /// The job was executed successfully and the execution result was returned. + /// </summary> + Completed, - /** - * The job has received the cancel command, but it is still running. - */ - CANCELING, + /// <summary> + /// The job has received the cancel command, but is still running. + /// </summary> + Canceling, - /** - * The job was successfully cancelled. - */ - CANCELED; + /// <summary> + /// The job was successfully cancelled. + /// </summary> + Canceled } diff --git a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java b/modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs similarity index 55% copy from modules/api/src/main/java/org/apache/ignite/compute/JobState.java copy to modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs index 4c7d18a4cc..b5bb9232d8 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/JobState.java +++ b/modules/platforms/dotnet/Apache.Ignite/Compute/JobStatus.cs @@ -15,39 +15,22 @@ * limitations under the License. */ -package org.apache.ignite.compute; +namespace Apache.Ignite.Compute; -/** - * Compute job's state enum. - */ -public enum JobState { - /** - * The job is submitted and waiting for an execution start. - */ - QUEUED, - - /** - * The job is being executed. - */ - EXECUTING, - - /** - * The job was unexpectedly terminated during execution. - */ - FAILED, - - /** - * The job was executed successfully and the execution result was returned. - */ - COMPLETED, - - /** - * The job has received the cancel command, but it is still running. - */ - CANCELING, +using System; +using NodaTime; - /** - * The job was successfully cancelled. - */ - CANCELED; -} +/// <summary> +/// Compute job status. +/// </summary> +/// <param name="Id">Job ID.</param> +/// <param name="State">State.</param> +/// <param name="CreateTime">Create time.</param> +/// <param name="StartTime">Start time (<c>null</c> when not yet started).</param> +/// <param name="FinishTime">Finish time (<c>null</c> when not yet finished).</param> +public sealed record JobStatus( + Guid Id, + JobState State, + Instant CreateTime, + Instant? StartTime, + Instant? FinishTime); diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs index 5d938590d0..208e4ee04d 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs @@ -60,7 +60,7 @@ namespace Apache.Ignite.Internal.Compute } /// <inheritdoc/> - public async Task<T> ExecuteAsync<T>( + public async Task<IJobExecution<T>> ExecuteAsync<T>( IEnumerable<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, @@ -76,7 +76,7 @@ namespace Apache.Ignite.Internal.Compute } /// <inheritdoc/> - public async Task<T> ExecuteColocatedAsync<T>( + public async Task<IJobExecution<T>> ExecuteColocatedAsync<T>( string tableName, IIgniteTuple key, IEnumerable<DeploymentUnit> units, @@ -92,7 +92,7 @@ namespace Apache.Ignite.Internal.Compute .ConfigureAwait(false); /// <inheritdoc/> - public async Task<T> ExecuteColocatedAsync<T, TKey>( + public async Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>( string tableName, TKey key, IEnumerable<DeploymentUnit> units, @@ -109,7 +109,7 @@ namespace Apache.Ignite.Internal.Compute .ConfigureAwait(false); /// <inheritdoc/> - public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>( + public IDictionary<IClusterNode, Task<IJobExecution<T>>> BroadcastAsync<T>( IEnumerable<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, @@ -119,12 +119,12 @@ namespace Apache.Ignite.Internal.Compute IgniteArgumentCheck.NotNull(jobClassName); IgniteArgumentCheck.NotNull(units); - var res = new Dictionary<IClusterNode, Task<T>>(); + var res = new Dictionary<IClusterNode, Task<IJobExecution<T>>>(); var units0 = units as ICollection<DeploymentUnit> ?? units.ToList(); // Avoid multiple enumeration. foreach (var node in nodes) { - var task = ExecuteOnNodes<T>(new[] { node }, units0, jobClassName, args); + Task<IJobExecution<T>> task = ExecuteOnNodes<T>(new[] { node }, units0, jobClassName, args); res[node] = task; } @@ -135,6 +135,59 @@ namespace Apache.Ignite.Internal.Compute /// <inheritdoc/> public override string ToString() => IgniteToStringBuilder.Build(GetType()); + /// <summary> + /// Gets the job status. + /// </summary> + /// <param name="jobId">Job ID.</param> + /// <returns>Status.</returns> + internal async Task<JobStatus?> GetJobStatusAsync(Guid jobId) + { + using var writer = ProtoCommon.GetMessageWriter(); + writer.MessageWriter.Write(jobId); + + using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeGetStatus, writer).ConfigureAwait(false); + return Read(res.GetReader()); + + JobStatus? Read(MsgPackReader reader) => reader.TryReadNil() ? null : ReadJobStatus(reader); + } + + /// <summary> + /// Cancels the job. + /// </summary> + /// <param name="jobId">Job id.</param> + /// <returns> + /// <c>true</c> when the job is cancelled, <c>false</c> when the job couldn't be cancelled + /// (either it's not yet started, or it's already completed), or <c> null</c> if there's no job with the specified id. + /// </returns> + internal async Task<bool?> CancelJobAsync(Guid jobId) + { + using var writer = ProtoCommon.GetMessageWriter(); + writer.MessageWriter.Write(jobId); + + using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeCancel, writer).ConfigureAwait(false); + return res.GetReader().ReadBooleanNullable(); + } + + /// <summary> + /// Changes the job priority. After priority change the job will be the last in the queue of jobs with the same priority. + /// </summary> + /// <param name="jobId">Job id.</param> + /// <param name="priority">New priority.</param> + /// <returns> + /// Returns <c>true</c> if the priority was successfully changed, + /// <c>false</c> when the priority couldn't be changed (job is already executing or completed), + /// <c>null</c> if the job was not found (no longer exists due to exceeding the retention time limit). + /// </returns> + internal async Task<bool?> ChangeJobPriorityAsync(Guid jobId, int priority) + { + using var writer = ProtoCommon.GetMessageWriter(); + writer.MessageWriter.Write(jobId); + writer.MessageWriter.Write(priority); + + using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeChangePriority, writer).ConfigureAwait(false); + return res.GetReader().ReadBooleanNullable(); + } + [SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "Secure random is not required here.")] private static IClusterNode GetRandomNode(ICollection<IClusterNode> nodes) { @@ -198,7 +251,47 @@ namespace Apache.Ignite.Internal.Compute }); } - private async Task<T> ExecuteOnNodes<T>( + private static JobStatus ReadJobStatus(MsgPackReader reader) + { + var id = reader.ReadGuid(); + var state = (JobState)reader.ReadInt32(); + var createTime = reader.ReadInstantNullable(); + var startTime = reader.ReadInstantNullable(); + var endTime = reader.ReadInstantNullable(); + + return new JobStatus(id, state, createTime.GetValueOrDefault(), startTime, endTime); + } + + private IJobExecution<T> GetJobExecution<T>(PooledBuffer computeExecuteResult, bool readSchema) + { + var reader = computeExecuteResult.GetReader(); + + if (readSchema) + { + _ = reader.ReadInt32(); + } + + var jobId = reader.ReadGuid(); + var resultTask = GetResult((NotificationHandler)computeExecuteResult.Metadata!); + + return new JobExecution<T>(jobId, resultTask, this); + + static async Task<(T, JobStatus)> GetResult(NotificationHandler handler) + { + using var notificationRes = await handler.Task.ConfigureAwait(false); + return Read(notificationRes.GetReader()); + } + + static (T, JobStatus) Read(MsgPackReader reader) + { + var res = (T)reader.ReadObjectFromBinaryTuple()!; + var status = ReadJobStatus(reader); + + return (res, status); + } + } + + private async Task<IJobExecution<T>> ExecuteOnNodes<T>( ICollection<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, @@ -213,9 +306,7 @@ namespace Apache.Ignite.Internal.Compute ClientOp.ComputeExecute, writer, PreferredNode.FromName(node.Name), expectNotifications: true) .ConfigureAwait(false); - var notificationHandler = (NotificationHandler)res.Metadata!; - using var notificationRes = await notificationHandler.Task.ConfigureAwait(false); - return Read(notificationRes); + return GetJobExecution<T>(res, readSchema: false); void Write() { @@ -231,13 +322,6 @@ namespace Apache.Ignite.Internal.Compute w.WriteObjectCollectionAsBinaryTuple(args); } - - static T Read(in PooledBuffer buf) - { - var reader = buf.GetReader(); - - return (T)reader.ReadObjectFromBinaryTuple()!; - } } private async Task<Table> GetTableAsync(string tableName) @@ -261,7 +345,7 @@ namespace Apache.Ignite.Internal.Compute } [SuppressMessage("Maintainability", "CA1508:Avoid dead conditional code", Justification = "False positive")] - private async Task<T> ExecuteColocatedAsync<T, TKey>( + private async Task<IJobExecution<T>> ExecuteColocatedAsync<T, TKey>( string tableName, TKey key, Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc, @@ -292,9 +376,7 @@ namespace Apache.Ignite.Internal.Compute ClientOp.ComputeExecuteColocated, bufferWriter, preferredNode, expectNotifications: true) .ConfigureAwait(false); - var notificationHandler = (NotificationHandler)res.Metadata!; - using var notificationRes = await notificationHandler.Task.ConfigureAwait(false); - return Read(notificationRes); + return GetJobExecution<T>(res, readSchema: true); } catch (IgniteException e) when (e.Code == ErrorGroups.Client.TableIdNotFound) { @@ -336,11 +418,6 @@ namespace Apache.Ignite.Internal.Compute return colocationHash; } - - static T Read(in PooledBuffer buf) - { - return (T)buf.GetReader().ReadObjectFromBinaryTuple()!; - } } } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs new file mode 100644 index 0000000000..441f7068cd --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/JobExecution.cs @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Internal.Compute; + +using System; +using System.Threading.Tasks; +using Ignite.Compute; + +/// <summary> +/// Job execution. +/// </summary> +/// <typeparam name="T">Job result type.</typeparam> +internal sealed record JobExecution<T> : IJobExecution<T> +{ + private readonly Task<(T Result, JobStatus Status)> _resultTask; + + private readonly Compute _compute; + + private volatile JobStatus? _finalStatus; + + /// <summary> + /// Initializes a new instance of the <see cref="JobExecution{T}"/> class. + /// </summary> + /// <param name="id">Job id.</param> + /// <param name="resultTask">Result task.</param> + /// <param name="compute">Compute.</param> + public JobExecution(Guid id, Task<(T Result, JobStatus Status)> resultTask, Compute compute) + { + Id = id; + _resultTask = resultTask; + _compute = compute; + + // Wait for completion in background and cache the status. + _ = CacheStatusOnCompletion(); + } + + /// <inheritdoc/> + public Guid Id { get; } + + /// <inheritdoc/> + public async Task<T> GetResultAsync() + { + var (result, _) = await _resultTask.ConfigureAwait(false); + return result; + } + + /// <inheritdoc/> + public async Task<JobStatus?> GetStatusAsync() + { + var finalStatus = _finalStatus; + if (finalStatus != null) + { + return finalStatus; + } + + var status = await _compute.GetJobStatusAsync(Id).ConfigureAwait(false); + if (status is { State: JobState.Completed or JobState.Failed or JobState.Canceled }) + { + // Can't be transitioned to another state, cache it. + _finalStatus = status; + } + + return status; + } + + /// <inheritdoc/> + public async Task<bool?> CancelAsync() => + await _compute.CancelJobAsync(Id).ConfigureAwait(false); + + /// <inheritdoc/> + public async Task<bool?> ChangePriorityAsync(int priority) => + await _compute.ChangeJobPriorityAsync(Id, priority).ConfigureAwait(false); + + private async Task CacheStatusOnCompletion() + { + var (_, status) = await _resultTask.ConfigureAwait(false); + + _finalStatus = status; + } +} diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs index 213990c209..80b3cecdfd 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOp.cs @@ -119,6 +119,15 @@ namespace Apache.Ignite.Internal.Proto SqlExecScript = 56, /** SQL parameter metadata. */ - SqlParamMeta = 57 + SqlParamMeta = 57, + + /** Get compute job status. */ + ComputeGetStatus = 59, + + /** Cancel compute job. */ + ComputeCancel = 60, + + /** Change compute job priority. */ + ComputeChangePriority = 61 } } diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs index 71c0803cf8..abab50c462 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/ClientOpExtensions.cs @@ -54,6 +54,9 @@ namespace Apache.Ignite.Internal.Proto ClientOp.TupleContainsKey => ClientOperationType.TupleContainsKey, ClientOp.ComputeExecute => ClientOperationType.ComputeExecute, ClientOp.ComputeExecuteColocated => ClientOperationType.ComputeExecute, + ClientOp.ComputeGetStatus => ClientOperationType.ComputeGetStatus, + ClientOp.ComputeCancel => ClientOperationType.ComputeCancel, + ClientOp.ComputeChangePriority => ClientOperationType.ComputeChangePriority, ClientOp.SqlExec => ClientOperationType.SqlExecute, ClientOp.SqlExecScript => ClientOperationType.SqlExecuteScript, ClientOp.SqlCursorNextPage => null, diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs index e86f3defa1..30357ca7bc 100644 --- a/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs +++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Proto/MsgPack/MsgPackReader.cs @@ -22,6 +22,7 @@ using System.Buffers.Binary; using System.IO; using BinaryTuple; using Ignite.Sql; +using NodaTime; /// <summary> /// MsgPack reader. @@ -80,6 +81,12 @@ internal ref struct MsgPackReader var invalid => throw GetInvalidCodeException("bool", invalid) }; + /// <summary> + /// Reads a nullable boolean value. + /// </summary> + /// <returns>The value.</returns> + public bool ReadBooleanNullable() => TryReadNil() ? default : ReadBoolean(); + /// <summary> /// Reads a short value. /// </summary> @@ -214,6 +221,14 @@ internal ref struct MsgPackReader return UuidSerializer.Read(GetSpan(16)); } + /// <summary> + /// Reads Instant value. + /// </summary> + /// <returns>Instant.</returns> + public Instant? ReadInstantNullable() => TryReadNil() + ? null + : Instant.FromUnixTimeSeconds(ReadInt64()).PlusNanoseconds(ReadInt32()); + /// <summary> /// Skips a value. /// </summary> diff --git a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs index 925f04afb2..41dcbcfa53 100644 --- a/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs +++ b/modules/platforms/dotnet/Apache.Ignite/RetryReadPolicy.cs @@ -58,6 +58,9 @@ namespace Apache.Ignite ClientOperationType.ComputeExecute => false, ClientOperationType.SqlExecute => false, ClientOperationType.SqlExecuteScript => false, + ClientOperationType.ComputeCancel => false, + ClientOperationType.ComputeChangePriority => false, + ClientOperationType.ComputeGetStatus => true, var unsupported => throw new NotSupportedException("Unsupported operation type: " + unsupported) }; }