Hello Alexey,

the sample code is as given below:

@ComputeTaskSessionFullSupport
public class SplitExampleJgraphWithComplexDAGIgniteCachesample extends
ComputeTaskSplitAdapter<CustomDirectedAcyclicGraph&lt;String, DefaultEdge> ,
Integer> {
        
   
        // Auto-injected task session.
    @TaskSessionResource 
    private ComputeTaskSession ses;
    
    
    private static final Random random = new Random();
    static int noOftasksExecutedSuccess = 0;
    
    SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss:SSS");

    

    
    @Override protected Collection<? extends ComputeJob> split(int
clusterSize, CustomDirectedAcyclicGraph<String, DefaultEdge> graph) {
        Collection<ComputeJob> jobs = new LinkedList<>();
        
        IgniteCache<String, Object> cacheUp = 
Ignition.ignite().getOrCreateCache("cacheNameNew");
        ses.addAttributeListener((key, val) -> {
            if ("COMPLETE".compareTo(key.toString()) == 0) {
                nextTaskToExecute(graph, cacheUp);
                }
        }, false);
        
        String task = null;
        if (cacheUp.get("CurrentVertex") != null)
                task = (String) cacheUp.get("CurrentVertex");
        for (DefaultEdge outgoingEdge : graph.outgoingEdgesOf(task)) {
                String sourceVertex = graph.getEdgeSource(outgoingEdge);
            String targetVertex = graph.getEdgeTarget(outgoingEdge);
            graph.setTargetVertex(targetVertex);
            executingJobsBuilt(graph, jobs);
        } if (task != null && graph.outgoingEdgesOf(task).size() == 0) {
                if (cacheUp.get(task) != null && (Boolean)cacheUp.get(task)) {
                        String targetVertex = setNextVertexInCache(graph, 
cacheUp);
                        graph.setTargetVertex(targetVertex);
                        nextTaskToExecute(graph, cacheUp);
                } else {
                        System.out.println("else parttttt");
                }
        }
        return jobs;
    }

    
    
        private void nextTaskToExecute(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph,
                        IgniteCache<String, Object> cacheUp) {
                Ignite ignite = Ignition.ignite();
                if (cacheUp.get("NextVertex") != null) {
                        String processingVertex = (String) 
cacheUp.get("NextVertex");
                        if (processingVertex != null && 
areParentVerticesProcessed(graph,
processingVertex, cacheUp)) {
                                cacheUp.put("CurrentVertex", processingVertex);
                                // Execute task on the cluster and wait for its 
completion.
                        
ignite.compute().execute(SplitExampleJgraphWithComplexDAGIgniteCachesample.class,
graph);
                        }
                }
        }
    
    private void executingJobsBuilt(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph, Collection<ComputeJob> jobs) {
        String targetVertex = graph.getTargetVertex();
        IgniteCache<String, Object> cacheNew = 
Ignition.ignite().getOrCreateCache("cacheNameNew");
       if (targetVertex != null && !cacheNew.containsKey(targetVertex)) {
           jobs.add(new ComputeJobAdapter() {
                   // Auto-injected job context.
                    @JobContextResource
                    private ComputeJobContext jobCtx;
                    
               @Nullable @Override public Object execute() {
                   int duration1 = 8000 + random.nextInt(100);
                   SimpleDateFormat dateFormatNew = new
SimpleDateFormat("HH:mm:ss:SSS");
                   String task = (String) targetVertex;
                   try {
                           Thread.sleep(duration1);
                       
System.out.println("****************************executed the job **********  
" + task +  "******" + dateFormatNew.format(new Date()));
                        cacheNew.put(task, true);
                       } catch (Exception e1) {
                                                e1.printStackTrace();
                                        } 
                                        ses.setAttribute("NEXTVERTEX", 
setNextVertexInCache(graph, cacheNew));
                                        ses.setAttribute("COMPLETE", duration1);
                   return duration1;
               }
               
           });
           
           }
       
   }
    

        private String setNextVertexInCache(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph, IgniteCache<String, Object> cache) {
                String task = null; 
                Set<String> dagSourceVertex = graph.vertexSet();
            Iterator itr = dagSourceVertex.iterator();
              while (itr.hasNext()) {
                task = (String)itr.next();
                        if(cache.get("CurrentVertex") != null &&
!task.equalsIgnoreCase((String)cache.get("CurrentVertex")))
                                        continue;
                        else {
                                task = (String)itr.next();
                                cache.put("NextVertex", task);
                                break;
                        }
                }
                return task;
        }
        
        
        
        private Boolean
areParentVerticesProcessed(CustomDirectedAcyclicGraph<String, DefaultEdge>
graph, String task, IgniteCache<String, Object> cache) {
                Boolean processed = false;
                for (DefaultEdge incomingEdge : graph.incomingEdgesOf(task)) {
//graph.outgoingEdgesOf(dagSourceVertex)
                String sourceVertex = graph.getEdgeSource(incomingEdge);
            String targetVertex = graph.getEdgeTarget(incomingEdge);
            if (cache!= null && cache.get(sourceVertex) != null) {
                processed = true;
            }
                }
                return processed;
        }
        
    /** {@inheritDoc} */
    @Nullable @Override public Integer reduce(List<ComputeJobResult>
results) {
        int sum = 0;

        for (ComputeJobResult res : results) {
                sum += res.<Integer>getData();
        }
            

        return sum;
    }
}


******************************************************************
the call for the same is as given below:
ignite.compute().executeAsync(SplitExampleJgraphWithComplexDAGIgniteCache.class,
buildGraph());

buildGraph is as given below:

private CustomDirectedAcyclicGraph<String, DefaultEdge> buildGraph() {
                
                CustomDirectedAcyclicGraph<String, DefaultEdge> graph =
                            new CustomDirectedAcyclicGraph<String,
DefaultEdge>(DefaultEdge.class);

                                String root = "root";
                        String a = "1";
                        String b = "2";
                        String c = "3";
                        String d = "4";
                        String e = "5";
                        String f = "6";
                        String g = "7";
                        String h = "8";
                        String i = "9";
                        String j = "10";
                        String k = "11";
                        String l = "12";
                        String m = "13";
                        String n = "14";

                        graph.addVertex(root);
                        graph.addVertex(a);
                        graph.addVertex(l);
                        graph.addVertex(k);
                
                        graph.addVertex(b);
                        graph.addVertex(c);
                        graph.addVertex(m);
                        
                        graph.addVertex(i);
                        graph.addVertex(h);
                        graph.addVertex(g);
                                                
                        graph.addVertex(f);
                        graph.addVertex(e);
                        graph.addVertex(d);
                        graph.addVertex(n);
                        graph.addVertex(j);
                        
                        DefaultEdge edg = graph.addEdge(root, a);
                        graph.addEdge(root, l);
                        graph.addEdge(root, k);
                        
                        graph.addEdge(a, b);
                        graph.addEdge(a, c);
                        graph.addEdge(l, m);
                       
                        graph.addEdge(b, i);
                        graph.addEdge(b, h);
                        graph.addEdge(b, g);
                        
                        graph.addEdge(c, f);
                        graph.addEdge(c, e);
                        graph.addEdge(c, d);
                        
                        graph.addEdge(m, d);
                        graph.addEdge(m, n);
                        
                        graph.addEdge(i, j);
                        
                        graph.setCurrentVertex(root);
                        
                        
                        
                        return graph;
        }


hope that is sufficient for u to test the same at ur end on a single node,
as it is working fine on three nodes.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Reply via email to