Hello all,
I am having some troubles getting nested iterations to work. The basic outline of my application looks like this : 1. create vertex dataset 2. create edge dataset 3. bulk iterate 100 times on edges { 3a. Create graph from nodes and edges 3b. Perform GatherSumApply (delta iteration) 3c. Map Vertices 3d. Perform GatherSumApply in other direction (again a delta iteration) 3e. Join with edges on target 3f. Output new edges } 4. write edges to file Am I correct in assuming that the two delta iterations (GSA) inside the bulk iteration are not allowed at this point in time? Or should I continue looking for bugs in my code? The stack trace doesn't help me all that much: Exception in thread "main" java.lang.IllegalStateException at org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342) at org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256) at org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) at org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301) at org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396) at org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333) at org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580) Regards, Pieter-Jan Van Aeken