[ https://issues.apache.org/jira/browse/IGNITE-22349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859636#comment-17859636 ]
Ignite TC Bot commented on IGNITE-22349: ---------------------------------------- {panel:title=Branch: [pull/11373/head] Base: [master] : No blockers found!|borderStyle=dashed|borderColor=#ccc|titleBGColor=#D6F7C1}{panel} {panel:title=Branch: [pull/11373/head] Base: [master] : New Tests (2)|borderStyle=dashed|borderColor=#ccc|titleBGColor=#D6F7C1} {color:#00008b}Platform .NET (Core Linux){color} [[tests 2|https://ci2.ignite.apache.org/viewLog.html?buildId=7927135]] * {color:#013220}DotNetCore: ComputeTaskSessionTest.DistributesTaskSessionAttributeLocally - PASSED{color} * {color:#013220}DotNetCore: ComputeTaskSessionTest.DistributesTaskSessionAttributeRemotely - PASSED{color} {panel} [TeamCity *--> Run :: All* Results|https://ci2.ignite.apache.org/viewLog.html?buildId=7927170&buildTypeId=IgniteTests24Java8_RunAll] > Ignite.NET support for priority ordering of Compute jobs > -------------------------------------------------------- > > Key: IGNITE-22349 > URL: https://issues.apache.org/jira/browse/IGNITE-22349 > Project: Ignite > Issue Type: Improvement > Components: .NET, compute, platforms > Affects Versions: 2.16 > Reporter: Alexey Kukushkin > Assignee: Alexey Kukushkin > Priority: Major > Labels: .net, compute, ise, platforms > Time Spent: 50m > Remaining Estimate: 0h > > I want Apache Ignite to support [priority > ordering|https://ignite.apache.org/docs/latest/distributed-computing/job-scheduling#priority-ordering] > of Ignite.NET compute jobs on the same node. > h4. Analysis > {{PriorityQueueCollisionSpi}} does priority ordering. The problem is the > {{PriorityQueueCollisionSpi}} expects the user to provide job priorities > via the {{ComputeTaskSession}}'s "{{grid.task.priority}}" attribute and the > {{ComputeTaskSession}} is not available in Ignite.NET. > It looks like the requirement is to add an injectable {{ComputeTaskSession}} > in Ignite.NET exposing the {{SetAttributes}} operation similar to how it > works in Java. > h4. Reproducer > I expect more or less ordered output from the below reproducer. The output > may not be completely ordered since completely ordered output requires all > the jobs to land on the server node in single batch and this reproducer > cannot guarantee that: > {noformat} > >>> Completed job with priority 0 > >>> Completed job with priority 9 > >>> Completed job with priority 8 > >>> Completed job with priority 7 > >>> Completed job with priority 6 > >>> Completed job with priority 5 > >>> Completed job with priority 4 > >>> Completed job with priority 3 > >>> Completed job with priority 2 > >>> Completed job with priority 1 > {noformat} > {{PriorityQueueCollisionSpiTest.cs}}: > {code:java} > public class PriorityQueueCollisionSpiTest > { > private static ITestOutputHelper? _output; > public PriorityQueueCollisionSpiTest(ITestOutputHelper output) > { > _output = output; > } > /// <summary> > /// Schedules jobs according to <see > cref="IComputeTask{TArg,TJobRes,TRes}"/>'s priority. > /// </summary> > [Fact] > public void SchedulesJobsAccordingToTaskPriority() > { > // Given an Ignite cluster consisting of server and client nodes > using var ignored = Ignition.Start(GetIgniteConfiguration("server1")); > var igniteConfiguration = GetIgniteConfiguration("app1"); > igniteConfiguration.ClientMode = true; > using var ignite = Ignition.Start(igniteConfiguration); > var igniteCompute = ignite.GetCompute(); > > // And the user asynchronously executes multiple tasks, each task > starting a job having increasing priority > const int jobCount = 10; > ICollection<Task> futureResultCollection = new List<Task>(jobCount); > for (var priority = 0; priority < jobCount; priority++) > { > var task = new PriorityTask(priority); > var futureResult = igniteCompute.ExecuteAsync(task, jobCount); > futureResultCollection.Add(futureResult); > } > // When all the jobs complete > Task.WaitAll(futureResultCollection.ToArray()); > > // Then the ">>> Completed job with priority" console output > demonstrates that the jobs completed in the > // decreasing priority order, more or less. > } > private static IgniteConfiguration GetIgniteConfiguration(string > igniteName) => > new() > { > ConsistentId = igniteName, > IgniteInstanceName = igniteName, > SpringConfigUrl = "ignite-sandbox.xml", > DiscoverySpi = new TcpDiscoverySpi > { > IpFinder = new TcpDiscoveryStaticIpFinder {Endpoints = new > List<string> {"127.0.0.1:48700"}}, > LocalPort = 48700 > }, > FailureDetectionTimeout = TimeSpan.FromMinutes(10), > ClientFailureDetectionTimeout = TimeSpan.FromMinutes(10), > JvmOptions = new List<string> > {"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"} > }; > /// <summary> > /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> implementation that > single <see cref="IComputeJob{TRes}"/>s with > /// the specified priority. > /// </summary> > [ComputeTaskSessionFullSupport] > private sealed class PriorityTask : ComputeTaskSplitAdapter<int, bool, > bool> > { > private readonly int _priority; > [TaskSessionResource] private IComputeTaskSession _taskSession; > public PriorityTask(int priority) > { > _priority = priority; > } > /// <inheritdoc /> > public override bool Reduce(IList<IComputeJobResult<bool>> results) > => results.All(r => r.Data); > /// <inheritdoc /> > protected override ICollection<IComputeJob<bool>> Split(int gridSize, > int jobCount) > { > IComputeJob<bool> job = new PriorityJob(_priority); > _taskSession.SetAttribute("grid.task.priority", _priority); > var actual = _taskSession.GetAttribute<int>("grid.task.priority"); > Assert.Equal(_priority, actual); > return new List<IComputeJob<bool>> {job}; > } > } > /// <summary> > /// <see cref="IComputeJob{TRes}"/> implementation with a priority > indicator. > /// </summary> > private class PriorityJob : ComputeJobAdapter<bool> > { > private readonly int _priority; > public PriorityJob(int priority) > { > _priority = priority; > } > /// <inheritdoc /> > public override bool Execute() > { > _output?.WriteLine($">>> Completed job with priority > {_priority}"); > return true; > } > } > } > {code} > {{ignite-sandbox.xml}}: > {code:xml} > <?xml version="1.0" encoding="utf-8"?> > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xmlns:util="http://www.springframework.org/schema/util" > xmlns:context="http://www.springframework.org/schema/context" > xsi:schemaLocation=" > http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans.xsd > http://www.springframework.org/schema/util > http://www.springframework.org/schema/util/spring-util-2.5.xsd" > > > <bean class="org.apache.ignite.configuration.IgniteConfiguration"> > <property name="collisionSpi"> > <bean > class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi"> > <property name="parallelJobsNumber" value="1"/> > <property name="starvationPreventionEnabled" value="false"/> > </bean> > </property> > </bean> > </beans> > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)