[ 
https://issues.apache.org/jira/browse/IGNITE-22349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851606#comment-17851606
 ] 

Pavel Tupitsyn commented on IGNITE-22349:
-----------------------------------------

[~kukushal] I would like to do a review, do you prefer to create a PR for that, 
or use JIRA comments?

> Ignite.NET support for priority ordering of Compute jobs
> --------------------------------------------------------
>
>                 Key: IGNITE-22349
>                 URL: https://issues.apache.org/jira/browse/IGNITE-22349
>             Project: Ignite
>          Issue Type: Improvement
>          Components: .NET, compute, platforms
>    Affects Versions: 2.16
>            Reporter: Alexey Kukushkin
>            Assignee: Alexey Kukushkin
>            Priority: Major
>              Labels: .net, compute, ise, platforms
>         Attachments: IGNITE-22349-README.md, ignite-22349.patch
>
>
> I want Apache Ignite to support [priority 
> ordering|https://ignite.apache.org/docs/latest/distributed-computing/job-scheduling#priority-ordering]
>  of Ignite.NET compute jobs on the same node.
> h4. Analysis
> {{PriorityQueueCollisionSpi}} does priority ordering. The problem is the 
> {{PriorityQueueCollisionSpi}} expects the user to provide job priorities 
> via the {{ComputeTaskSession}}'s "{{grid.task.priority}}" attribute and the 
> {{ComputeTaskSession}} is not available in Ignite.NET.
> It looks like the requirement is to add an injectable {{ComputeTaskSession}} 
> in Ignite.NET exposing the {{SetAttributes}} operation similar to how it 
> works in Java.
> h4. Reproducer
> I expect more or less ordered output from the below reproducer. The output 
> may not be completely ordered since completely ordered output requires all 
> the jobs to land on the server node in single batch and this reproducer 
> cannot guarantee that:
> {noformat}
> >>> Completed job with priority 0
> >>> Completed job with priority 9
> >>> Completed job with priority 8
> >>> Completed job with priority 7
> >>> Completed job with priority 6
> >>> Completed job with priority 5
> >>> Completed job with priority 4
> >>> Completed job with priority 3
> >>> Completed job with priority 2
> >>> Completed job with priority 1
> {noformat}
> {{PriorityQueueCollisionSpiTest.cs}}:
> {code:java}
> public class PriorityQueueCollisionSpiTest
> {
>     private static ITestOutputHelper? _output;
>     public PriorityQueueCollisionSpiTest(ITestOutputHelper output)
>     {
>         _output = output;
>     }
>     /// <summary>
>     /// Schedules jobs according to <see 
> cref="IComputeTask{TArg,TJobRes,TRes}"/>'s priority. 
>     /// </summary>
>     [Fact]
>     public void SchedulesJobsAccordingToTaskPriority()
>     {
>         // Given an Ignite cluster consisting of server and client nodes
>         using var ignored = Ignition.Start(GetIgniteConfiguration("server1"));
>         var igniteConfiguration = GetIgniteConfiguration("app1");
>         igniteConfiguration.ClientMode = true;
>         using var ignite = Ignition.Start(igniteConfiguration);
>         var igniteCompute = ignite.GetCompute();
>         
>         // And the user asynchronously executes multiple tasks, each task 
> starting a job having increasing priority
>         const int jobCount = 10;
>         ICollection<Task> futureResultCollection = new List<Task>(jobCount);
>         for (var priority = 0; priority < jobCount; priority++)
>         {
>             var task = new PriorityTask(priority);
>             var futureResult = igniteCompute.ExecuteAsync(task, jobCount);
>             futureResultCollection.Add(futureResult);
>         }
>         // When all the jobs complete
>         Task.WaitAll(futureResultCollection.ToArray());
>         
>         // Then the ">>> Completed job with priority" console output 
> demonstrates that the jobs completed in the
>         // decreasing priority order, more or less.
>     }
>     private static IgniteConfiguration GetIgniteConfiguration(string 
> igniteName) =>
>         new()
>         {
>             ConsistentId = igniteName,
>             IgniteInstanceName = igniteName,
>             SpringConfigUrl = "ignite-sandbox.xml",
>             DiscoverySpi = new TcpDiscoverySpi
>             {
>                 IpFinder = new TcpDiscoveryStaticIpFinder {Endpoints = new 
> List<string> {"127.0.0.1:48700"}},
>                 LocalPort = 48700
>             },
>             FailureDetectionTimeout = TimeSpan.FromMinutes(10),
>             ClientFailureDetectionTimeout = TimeSpan.FromMinutes(10),
>             JvmOptions = new List<string> 
> {"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005"}
>         };
>     /// <summary>
>     /// <see cref="IComputeTask{TArg,TJobRes,TRes}"/> implementation that 
> single <see cref="IComputeJob{TRes}"/>s with
>     /// the specified priority.
>     /// </summary>
>     [ComputeTaskSessionFullSupport]
>     private sealed class PriorityTask : ComputeTaskSplitAdapter<int, bool, 
> bool>
>     {
>         private readonly int _priority;
>         [TaskSessionResource] private IComputeTaskSession _taskSession;
>         public PriorityTask(int priority)
>         {
>             _priority = priority;
>         }
>         /// <inheritdoc />
>         public override bool Reduce(IList<IComputeJobResult<bool>> results) 
> => results.All(r => r.Data);
>         /// <inheritdoc />
>         protected override ICollection<IComputeJob<bool>> Split(int gridSize, 
> int jobCount)
>         {
>             IComputeJob<bool> job = new PriorityJob(_priority);
>             _taskSession.SetAttribute("grid.task.priority", _priority);
>             var actual = _taskSession.GetAttribute<int>("grid.task.priority");
>             Assert.Equal(_priority, actual);
>             return new List<IComputeJob<bool>> {job};
>         }
>     }
>     /// <summary>
>     /// <see cref="IComputeJob{TRes}"/> implementation with a priority 
> indicator.
>     /// </summary>
>     private class PriorityJob : ComputeJobAdapter<bool>
>     {
>         private readonly int _priority;
>         public PriorityJob(int priority)
>         {
>             _priority = priority;
>         }
>         /// <inheritdoc />
>         public override bool Execute()
>         {
>             _output?.WriteLine($">>> Completed job with priority 
> {_priority}");
>             return true;
>         }
>     }
> }
> {code}
> {{ignite-sandbox.xml}}:
> {code:xml}
> <?xml version="1.0" encoding="utf-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xmlns:util="http://www.springframework.org/schema/util";
>        xmlns:context="http://www.springframework.org/schema/context";
>        xsi:schemaLocation="
>         http://www.springframework.org/schema/beans
>         http://www.springframework.org/schema/beans/spring-beans.xsd
>         http://www.springframework.org/schema/util
>         http://www.springframework.org/schema/util/spring-util-2.5.xsd";
> >
>     <bean class="org.apache.ignite.configuration.IgniteConfiguration">
>         <property name="collisionSpi">
>             <bean 
> class="org.apache.ignite.spi.collision.priorityqueue.PriorityQueueCollisionSpi">
>                 <property name="parallelJobsNumber" value="1"/>
>                 <property name="starvationPreventionEnabled" value="false"/>
>             </bean>
>         </property>
>     </bean>
> </beans>
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to