Hi! I have implemented a simple example where the solution set delta depends on the workset through a broadcast variable. It works both in the latest 0.6-SNAPSHOT and in 0.5.1. Attached is the rendered execution plan, which shows the workset going only in through a BC variable.
Maybe the problem is something different. Can you post your program? Stephan My sample program: DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>()); DataSet<Tuple2<Long, Long>> invariantInput = env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>()); // iteration from here DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1); DataSet<Tuple2<Long, Long>> result = invariantInput .map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data") .join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1).types(Long.class, Long.class); iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print(); OptimizedPlan p = compileNoStats(env.createProgramPlan()); On Thu, Jul 10, 2014 at 7:28 PM, Stephan Ewen <[email protected]> wrote: > Hey Jack! > > Let me look into this. It should be okay to have the Solution Set Delta > depend on the Workset via a Broadcast Variable. If the system prohibits > that, it is not intentional. > > If you can provide us with a minimal example that produces the error, it > would be great. > > Greetings, > Stephan > > > > On Thu, Jul 10, 2014 at 3:25 PM, Vasiliki Kalavri < > [email protected]> wrote: > >> Hi Jack, >> >> regarding the "solution set delta does not depend on the workset" issue, >> in >> Delta iterations the state is maintained in the solution set and the >> workset serves as the input to the next iteration. The solution set delta >> represents the changes that need to be merged to the state (solution set) >> at the end of each superstep. Therefore, the solution set delta needs to >> depend on the input (the workset). However, as Fabian already said, this >> is >> not detected if you're using a broadcast set. Why don't you provide it as >> a >> regular input to your operator? >> >> Regarding the notion of keys, the Delta iteration assumes that each >> element >> in the solution set is uniquely identified by a key and this is how the >> merge with the solution set delta happens, after the end of a superstep. >> One solution to your problem might be to create a new random key for each >> element that you want to add to the solution set. Would that be possible? >> >> Cheers, >> V. >> >> >> >> >> >> On 7 July 2014 10:53, Fabian Hueske <[email protected]> wrote: >> >> > Hi Jake, >> > >> > the problem in the other thread was that, for testing purposes, a >> > collection data source was used as delta set. So the working set and the >> > delta set were not connected at all. >> > >> > In your program this is the case, though the connection is through a >> > broadcast set which is not detected by the compiler. >> > >> > I am not very familiar with the iterations code, so I leave the tricky >> > questions for somebody who knows the details. >> > >> > Best, Fabian >> > >> > >> > >> > >> > 2014-07-06 10:38 GMT+02:00 Jack David Galilee < >> [email protected] >> > >: >> > >> > > Hi, >> > > >> > > >> > > I know a similar problem to this was raised earlier last month from >> the >> > > archive ( >> > > >> http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser). >> > > However, I am unable to see if this was ever solved. >> > > >> > > >> > > I am encountering the same problem "In the given plan, the solution >> set >> > > delta does not depend on the workset.", but what I can't ascertain >> > (having >> > > examined the PACT compiler (0.5.1)) is whether this is a bug or an >> > > intentional design constraint placed on the delta iteration operator. >> > > >> > > >> > > My algorithm sits between the Delta and Bulk iterative models as an >> > > Incremental iterative algorithm. The solution set is the union of all >> > > working sets up until the current working set is empty. >> > > >> > > >> > > The working set is broadcast to a single operator in the data-flow. >> This >> > > appears to be the problem, the compiler is unable to determine the >> > > dependency via this broadcast. >> > > >> > > >> > > To make things more complex my data does not suit the >> pseudo-relational >> > > model Flink is designed around. I am dealing with variable length >> sets / >> > > arrays so I can't join against the solution set, or working set >> between >> > > iterations because the data has no notion of keys. >> > > >> > > >> > > I can make it 'run' as a BulkIteration, but the result is the final >> state >> > > (the empty working set) as at least the 0.5.1 API doesn't allow all >> > > previous steps to be captured in a union - I essentially lose the >> answer >> > > once the algorithm converges. >> > > >> > > >> > > Your opinion as to whether this is actually a bug, or if I am doing it >> > all >> > > completely wrong would be most appreciated. >> > > >> > > >> > > >> > > Cheers, >> > > >> > > Jack Galilee >> > > >> > >> > >
