Hi Pieter!

The pattern you need is common and we have code for such loops in the
works. It is being delayed right now due to work on fault-tolerance / high
availability, but we plan to resume with that
after those issues are done.

Greetings,
Stephan


On Tue, Aug 18, 2015 at 12:38 PM, Pieter-Jan Van Aeken <
pieterjan.vanae...@euranova.eu> wrote:

> Hi,
>
>
>
> Thanks for the quick response. Thinking of a workaround right now, where I
> simply run the same application 100 times but I would like to avoid as much
> duplicate work as possible. Is it possible to "broadcast" nodes and edges
> datasets from one execution environment to the next without passing through
> a data sink to collect them into heap or write them to a file?
>
>
> Regards,
>
> Pieter-Jan Van Aeken
>
> 2015-08-18 12:11 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
>
> > Hello Pieter,
> >
> > Nested iterations are indeed not supported in Flink.
> >
> >
> http://mail-archives.apache.org/mod_mbox/flink-user/201504.mbox/%3Cop.xw24u7fhf7e33m@vaio-sb%3E
> >
> > The problem is not in your code.
> >
> >
> > On Tue, Aug 18, 2015 at 11:27 AM, Pieter-Jan Van Aeken <
> > pieterjan.vanae...@euranova.eu> wrote:
> >
> > > Hello all,
> > >
> > >
> > > I am having some troubles getting nested iterations to work. The basic
> > > outline of my application looks like this :
> > >
> > > 1. create vertex dataset
> > > 2. create edge dataset
> > > 3. bulk iterate 100 times on edges {
> > >     3a. Create graph from nodes and edges
> > >     3b. Perform GatherSumApply (delta iteration)
> > >     3c. Map Vertices
> > >     3d. Perform GatherSumApply in other direction (again a delta
> > iteration)
> > >     3e. Join with edges on target
> > >     3f. Output new edges
> > > }
> > > 4. write edges to file
> > >
> > > Am I correct in assuming that the two delta iterations (GSA) inside the
> > > bulk iteration are not allowed at this point in time? Or should I
> > continue
> > > looking for bugs in my code? The stack trace doesn't help me all that
> > much:
> > >
> > > Exception in thread "main" java.lang.IllegalStateException
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.AbstractPartialSolutionNode.getAlternativePlans(AbstractPartialSolutionNode.java:86)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:342)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:256)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.BulkIterationNode.instantiateCandidate(BulkIterationNode.java:301)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.addLocalCandidates(SingleInputNode.java:396)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.SingleInputNode.getAlternativePlans(SingleInputNode.java:333)
> > >     at
> > >
> > >
> >
> org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204)
> > >     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500)
> > >     at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402)
> > >     at
> > >
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:173)
> > >     at
> > >
> > >
> >
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54)
> > >     at
> > >
> > >
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
> > >     at
> > >
> > >
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:580)
> > >
> > >
> > > Regards,
> > >
> > > Pieter-Jan Van Aeken
> > >
> >
>

Reply via email to