This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 878319753c6 IGNITE-27341 .ΝΕΤ: Propagate compute task names to Java
(#12609)
878319753c6 is described below
commit 878319753c61bbca02b4ed49c03e26fc75f0b9d9
Author: Ravil <[email protected]>
AuthorDate: Thu Jan 15 15:42:19 2026 +0300
IGNITE-27341 .ΝΕΤ: Propagate compute task names to Java (#12609)
Ensure that the correct .NET task or closure name is visible in system
views and internal APIs.
Co-authored-by: Pavel Tupitsyn <[email protected]>
---
.../platform/compute/PlatformAbstractTask.java | 8 +
.../compute/PlatformBalancingMultiClosureTask.java | 3 +
.../PlatformBalancingSingleClosureTask.java | 3 +
.../PlatformBroadcastingMultiClosureTask.java | 3 +
.../PlatformBroadcastingSingleClosureTask.java | 3 +
.../platform/compute/PlatformFullTask.java | 3 -
.../processors/task/GridTaskProcessor.java | 8 +
.../Compute/ComputeTaskName.cs | 202 +++++++++++++++++++++
8 files changed, 230 insertions(+), 3 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
index 8764f22af96..5a84b27d9a2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
@@ -49,6 +49,9 @@ public abstract class PlatformAbstractTask implements
ComputeTask<Object, Void>
/** Done flag. */
protected boolean done;
+ /** Platform task name. */
+ protected String taskName;
+
/**
* Constructor.
*
@@ -60,6 +63,11 @@ public abstract class PlatformAbstractTask implements
ComputeTask<Object, Void>
this.taskPtr = taskPtr;
}
+ /** @return Task name. */
+ public String taskName() {
+ return taskName;
+ }
+
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) {
assert rcvd.isEmpty() : "Should not cache result in Java for interop
task";
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
index 65ae50061a8..188499bbe77 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
@@ -79,5 +79,8 @@ public class PlatformBalancingMultiClosureTask extends
PlatformAbstractTask {
*/
public void jobs(Collection<PlatformJob> jobs) {
this.jobs = jobs;
+
+ if (!F.isEmpty(jobs))
+ this.taskName = jobs.iterator().next().name();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
index e2dc7aad70e..89a57c82583 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingSingleClosureTask.java
@@ -77,5 +77,8 @@ public class PlatformBalancingSingleClosureTask extends
PlatformAbstractTask {
*/
public void job(PlatformJob job) {
this.job = job;
+
+ if (job != null)
+ this.taskName = job.name();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
index d1771f7072c..6d4f92d2859 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingMultiClosureTask.java
@@ -84,5 +84,8 @@ public class PlatformBroadcastingMultiClosureTask extends
PlatformAbstractTask {
*/
public void jobs(Collection<PlatformJob> jobs) {
this.jobs = jobs;
+
+ if (!F.isEmpty(jobs))
+ this.taskName = jobs.iterator().next().name();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
index 613d69ab8f3..0e961342e14 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBroadcastingSingleClosureTask.java
@@ -81,5 +81,8 @@ public class PlatformBroadcastingSingleClosureTask extends
PlatformAbstractTask
*/
public void job(PlatformJob job) {
this.job = job;
+
+ if (job != null)
+ this.taskName = job.name();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
index ec7752a1ba8..89540b3ae12 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java
@@ -57,9 +57,6 @@ public final class PlatformFullTask extends
PlatformAbstractTask {
/** Cluster group. */
private final ClusterGroup grp;
- /** Platform task name. */
- private final String taskName;
-
/** {@code true} if distribution of the session attributes should be
enabled. */
private final boolean taskSesFullSupport;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index ebcfc3ad5ee..b33d2907ebc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -72,6 +72,7 @@ import
org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupp
import org.apache.ignite.internal.processors.job.ComputeJobStatusEnum;
import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import
org.apache.ignite.internal.processors.platform.compute.PlatformAbstractTask;
import org.apache.ignite.internal.processors.platform.compute.PlatformFullTask;
import org.apache.ignite.internal.processors.task.monitor.ComputeGridMonitor;
import org.apache.ignite.internal.processors.task.monitor.ComputeTaskStatus;
@@ -641,6 +642,13 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
}
}
+ if (task instanceof PlatformAbstractTask) {
+ String taskName0 = ((PlatformAbstractTask)task).taskName();
+
+ if (taskName0 != null)
+ taskName = taskName0;
+ }
+
assert taskName != null;
if (log.isDebugEnabled())
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskName.cs
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskName.cs
new file mode 100644
index 00000000000..f013e89fb0f
--- /dev/null
+++
b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTaskName.cs
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Compute
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading;
+ using Apache.Ignite.Core.Cache.Query;
+ using Apache.Ignite.Core.Compute;
+ using NUnit.Framework;
+ using static TestUtils;
+
+ /// <summary>
+ /// Tests for <see cref="IComputeTaskSession"/>
+ /// </summary>
+ public class ComputeTaskNameTest : TestBase
+ {
+ /// <summary>
+ /// .Net compute task name is propagated into platform task.
+ /// </summary>
+ [Test]
+ public void TaskNameTakenFromPlatformTask()
+ {
+ // Call task asynchronously with delay
+ var task = new LongTask(3000);
+ var cts = new CancellationTokenSource();
+ Ignite.GetCompute().ExecuteAsync(task, 123, cts.Token);
+
+ try
+ {
+ // Check task in system views via SQL
+ var res = Ignite
+ .GetOrCreateCache<string, string>("test")
+ .Query(new SqlFieldsQuery("SELECT TASK_NAME,
TASK_CLASS_NAME FROM SYS.TASKS", null))
+ .GetAll()
+ .Single();
+
+
Assert.AreEqual("Apache.Ignite.Core.Tests.Compute.ComputeTaskNameTest+LongTask",
res[0]);
+
Assert.AreEqual("org.apache.ignite.internal.processors.platform.compute.PlatformFullTask",
res[1]);
+ }
+ finally
+ {
+ cts.Cancel();
+ }
+ }
+
+ /// <summary>
+ /// .Net compute closure name is propagated into platform task.
+ /// </summary>
+ [Test]
+ public void ClosureNameTakenFromPlatformTask()
+ {
+ // Call task asynchronously with delay
+ var clo = new LongClosure(3000);
+ var cts = new CancellationTokenSource();
+ Ignite.GetCompute().CallAsync(clo, cts.Token);
+
+ try
+ {
+ // Check task in system views via SQL
+ var res = Ignite
+ .GetOrCreateCache<string, string>("test")
+ .Query(new SqlFieldsQuery("SELECT TASK_NAME,
TASK_CLASS_NAME FROM SYS.TASKS", null))
+ .GetAll()
+ .Single();
+
+
Assert.AreEqual("Apache.Ignite.Core.Tests.Compute.LongClosure", res[0]);
+
Assert.AreEqual("org.apache.ignite.internal.processors.platform.compute.PlatformBalancingSingleClosureTask",
res[1]);
+ }
+ finally
+ {
+ cts.Cancel();
+ }
+ }
+
+ /// <summary>
+ /// .Net broadcast compute closure name is propagated into platform
task.
+ /// </summary>
+ [Test]
+ public void BroadcastClosureNameTakenFromPlatformTask()
+ {
+ // Call task asynchronously with delay
+ var clo = new LongClosure(3000);
+ var cts = new CancellationTokenSource();
+ Ignite.GetCompute().BroadcastAsync(clo, cts.Token);
+
+ try
+ {
+ // Check task in system views via SQL
+ var res = Ignite
+ .GetOrCreateCache<string, string>("test")
+ .Query(new SqlFieldsQuery("SELECT TASK_NAME,
TASK_CLASS_NAME FROM SYS.TASKS", null))
+ .GetAll()
+ .Single();
+
+
Assert.AreEqual("Apache.Ignite.Core.Tests.Compute.LongClosure", res[0]);
+
Assert.AreEqual("org.apache.ignite.internal.processors.platform.compute.PlatformBroadcastingSingleClosureTask",
res[1]);
+ }
+ finally
+ {
+ cts.Cancel();
+ }
+ }
+
+ /// <summary>
+ /// Creates a task that executes <see cref="LongJob"/>.
+ /// </summary>
+ [ComputeTaskSessionFullSupport]
+ private class LongTask : ComputeTaskSplitAdapter<int, string, string>
+ {
+ /// Delay time in milliseconds.
+ private readonly int _delay;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="delay">Execution delay time in
milliseconds</param>
+ public LongTask(int delay)
+ {
+ _delay = delay;
+ }
+
+ /// <inheritdoc />
+ public override string Reduce(IList<IComputeJobResult<string>>
results) => results[0].Data;
+
+ /// <inheritdoc />
+ protected override ICollection<IComputeJob<string>> Split(int
gridSize, int attrValue)
+ {
+ return new List<IComputeJob<string>> { new LongJob(_delay) };
+ }
+ }
+
+ /// <summary>
+ /// Implements delayed job execution.
+ /// </summary>
+ private class LongJob : ComputeJobAdapter<string>
+ {
+ /// Delay time in milliseconds.
+ private readonly int _delay;
+
+ /// <summary>
+ /// Constructor.
+ /// </summary>
+ /// <param name="delay">Execution delay time in
milliseconds</param>
+ public LongJob(int delay)
+ {
+ _delay = delay;
+ }
+
+ /// <inheritdoc />
+ public override string Execute()
+ {
+ Thread.Sleep(_delay);
+
+ return "OK";
+ }
+ }
+ }
+
+ /// <summary>
+ /// Implements closure with delayed execution.
+ /// </summary>
+ [Serializable]
+ public class LongClosure : IComputeFunc<String>
+ {
+ /// Delay time in milliseconds.
+ private readonly int _delay;
+
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="s"></param>
+ public LongClosure(int delay)
+ {
+ _delay = delay;
+ }
+
+ /** <inheritDoc /> */
+ public string Invoke()
+ {
+ Thread.Sleep(_delay);
+
+ return "OK";
+ }
+ }
+}