Hello Alexey,
the sample code is as given below:
@ComputeTaskSessionFullSupport
public class SplitExampleJgraphWithComplexDAGIgniteCachesample extends
ComputeTaskSplitAdapter<CustomDirectedAcyclicGraph<String, DefaultEdge> ,
Integer> {
// Auto-injected task session.
@TaskSessionResource
private ComputeTaskSession ses;
private static final Random random = new Random();
static int noOftasksExecutedSuccess = 0;
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss:SSS");
@Override protected Collection<? extends ComputeJob> split(int
clusterSize, CustomDirectedAcyclicGraph<String, DefaultEdge> graph) {
Collection<ComputeJob> jobs = new LinkedList<>();
IgniteCache<String, Object> cacheUp =
Ignition.ignite().getOrCreateCache("cacheNameNew");
ses.addAttributeListener((key, val) -> {
if ("COMPLETE".compareTo(key.toString()) == 0) {
nextTaskToExecute(graph, cacheUp);
}
}, false);
String task = null;
if (cacheUp.get("CurrentVertex") != null)
task = (String) cacheUp.get("CurrentVertex");
for (DefaultEdge outgoingEdge : graph.outgoingEdgesOf(task)) {
String sourceVertex = graph.getEdgeSource(outgoingEdge);
String targetVertex = graph.getEdgeTarget(outgoingEdge);
graph.setTargetVertex(targetVertex);
executingJobsBuilt(graph, jobs);
} if (task != null && graph.outgoingEdgesOf(task).size() == 0) {
if (cacheUp.get(task) != null && (Boolean)cacheUp.get(task)) {
String targetVertex = setNextVertexInCache(graph,
cacheUp);
graph.setTargetVertex(targetVertex);
nextTaskToExecute(graph, cacheUp);
} else {
System.out.println("else parttttt");
}
}
return jobs;
}
private void nextTaskToExecute(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph,
IgniteCache<String, Object> cacheUp) {
Ignite ignite = Ignition.ignite();
if (cacheUp.get("NextVertex") != null) {
String processingVertex = (String)
cacheUp.get("NextVertex");
if (processingVertex != null &&
areParentVerticesProcessed(graph,
processingVertex, cacheUp)) {
cacheUp.put("CurrentVertex", processingVertex);
// Execute task on the cluster and wait for its
completion.
ignite.compute().execute(SplitExampleJgraphWithComplexDAGIgniteCachesample.class,
graph);
}
}
}
private void executingJobsBuilt(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph, Collection<ComputeJob> jobs) {
String targetVertex = graph.getTargetVertex();
IgniteCache<String, Object> cacheNew =
Ignition.ignite().getOrCreateCache("cacheNameNew");
if (targetVertex != null && !cacheNew.containsKey(targetVertex)) {
jobs.add(new ComputeJobAdapter() {
// Auto-injected job context.
@JobContextResource
private ComputeJobContext jobCtx;
@Nullable @Override public Object execute() {
int duration1 = 8000 + random.nextInt(100);
SimpleDateFormat dateFormatNew = new
SimpleDateFormat("HH:mm:ss:SSS");
String task = (String) targetVertex;
try {
Thread.sleep(duration1);
System.out.println("****************************executed the job **********
" + task + "******" + dateFormatNew.format(new Date()));
cacheNew.put(task, true);
} catch (Exception e1) {
e1.printStackTrace();
}
ses.setAttribute("NEXTVERTEX",
setNextVertexInCache(graph, cacheNew));
ses.setAttribute("COMPLETE", duration1);
return duration1;
}
});
}
}
private String setNextVertexInCache(CustomDirectedAcyclicGraph<String,
DefaultEdge> graph, IgniteCache<String, Object> cache) {
String task = null;
Set<String> dagSourceVertex = graph.vertexSet();
Iterator itr = dagSourceVertex.iterator();
while (itr.hasNext()) {
task = (String)itr.next();
if(cache.get("CurrentVertex") != null &&
!task.equalsIgnoreCase((String)cache.get("CurrentVertex")))
continue;
else {
task = (String)itr.next();
cache.put("NextVertex", task);
break;
}
}
return task;
}
private Boolean
areParentVerticesProcessed(CustomDirectedAcyclicGraph<String, DefaultEdge>
graph, String task, IgniteCache<String, Object> cache) {
Boolean processed = false;
for (DefaultEdge incomingEdge : graph.incomingEdgesOf(task)) {
//graph.outgoingEdgesOf(dagSourceVertex)
String sourceVertex = graph.getEdgeSource(incomingEdge);
String targetVertex = graph.getEdgeTarget(incomingEdge);
if (cache!= null && cache.get(sourceVertex) != null) {
processed = true;
}
}
return processed;
}
/** {@inheritDoc} */
@Nullable @Override public Integer reduce(List<ComputeJobResult>
results) {
int sum = 0;
for (ComputeJobResult res : results) {
sum += res.<Integer>getData();
}
return sum;
}
}
******************************************************************
the call for the same is as given below:
ignite.compute().executeAsync(SplitExampleJgraphWithComplexDAGIgniteCache.class,
buildGraph());
buildGraph is as given below:
private CustomDirectedAcyclicGraph<String, DefaultEdge> buildGraph() {
CustomDirectedAcyclicGraph<String, DefaultEdge> graph =
new CustomDirectedAcyclicGraph<String,
DefaultEdge>(DefaultEdge.class);
String root = "root";
String a = "1";
String b = "2";
String c = "3";
String d = "4";
String e = "5";
String f = "6";
String g = "7";
String h = "8";
String i = "9";
String j = "10";
String k = "11";
String l = "12";
String m = "13";
String n = "14";
graph.addVertex(root);
graph.addVertex(a);
graph.addVertex(l);
graph.addVertex(k);
graph.addVertex(b);
graph.addVertex(c);
graph.addVertex(m);
graph.addVertex(i);
graph.addVertex(h);
graph.addVertex(g);
graph.addVertex(f);
graph.addVertex(e);
graph.addVertex(d);
graph.addVertex(n);
graph.addVertex(j);
DefaultEdge edg = graph.addEdge(root, a);
graph.addEdge(root, l);
graph.addEdge(root, k);
graph.addEdge(a, b);
graph.addEdge(a, c);
graph.addEdge(l, m);
graph.addEdge(b, i);
graph.addEdge(b, h);
graph.addEdge(b, g);
graph.addEdge(c, f);
graph.addEdge(c, e);
graph.addEdge(c, d);
graph.addEdge(m, d);
graph.addEdge(m, n);
graph.addEdge(i, j);
graph.setCurrentVertex(root);
return graph;
}
hope that is sufficient for u to test the same at ur end on a single node,
as it is working fine on three nodes.
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/