Hello all,

I am having some troubles getting nested iterations to work. The basic
outline of my application looks like this :

1. create vertex dataset
2. create edge dataset
3. bulk iterate 100 times on edges {
    3a. Create graph from nodes and edges
    3b. Perform GatherSumApply (delta iteration)
    3c. Map Vertices
    3d. Perform GatherSumApply in other direction (again a delta iteration)
    3e. Join with edges on target
    3f. Output new edges
}
4. write edges to file

Am I correct in assuming that the two delta iterations (GSA) inside the
bulk iteration are not allowed at this point in time? Or should I continue
looking for bugs in my code? The stack trace doesn't help me all that much:

Exception in thread "main" java.lang.IllegalStateException
    at
org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342)
    at
org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
    at
org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
    at
org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301)
    at
org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396)
    at
org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333)
    at
org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
    at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
    at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
    at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
    at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
    at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580)


Regards,

Pieter-Jan Van Aeken

Reply via email to