Hi!

I have implemented a simple example where the solution set delta depends on
the workset through a broadcast variable. It works both in the latest
0.6-SNAPSHOT and in 0.5.1. Attached is the rendered execution plan, which
shows the workset going only in through a BC variable.

Maybe the problem is something different. Can you post your program?

Stephan




My sample program:


DataSet<Tuple2<Long, Long>> source =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());

DataSet<Tuple2<Long, Long>> invariantInput =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());

// iteration from here
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter =
source.iterateDelta(source, 1000, 1);

DataSet<Tuple2<Long, Long>> result =
invariantInput
.map(new IdentityMapper<Tuple2<Long,
Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1).types(Long.class,
Long.class);

iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()),
result).print();

OptimizedPlan p = compileNoStats(env.createProgramPlan());


On Thu, Jul 10, 2014 at 7:28 PM, Stephan Ewen <[email protected]> wrote:

> Hey Jack!
>
> Let me look into this. It should be okay to have the Solution Set Delta
> depend on the Workset via a Broadcast Variable. If the system prohibits
> that, it is not intentional.
>
> If you can provide us with a minimal example that produces the error, it
> would be great.
>
> Greetings,
> Stephan
>
>
>
> On Thu, Jul 10, 2014 at 3:25 PM, Vasiliki Kalavri <
> [email protected]> wrote:
>
>> Hi Jack,
>>
>> regarding the "solution set delta does not depend on the workset" issue,
>> in
>> Delta iterations the state is maintained in the solution set and the
>> workset serves as the input to the next iteration. The solution set delta
>> represents the changes that need to be merged to the state (solution set)
>> at the end of each superstep. Therefore, the solution set delta needs to
>> depend on the input (the workset). However, as Fabian already said, this
>> is
>> not detected if you're using a broadcast set. Why don't you provide it as
>> a
>> regular input to your operator?
>>
>> Regarding the notion of keys, the Delta iteration assumes that each
>> element
>> in the solution set is uniquely identified by a key and this is how the
>> merge with the solution set delta happens, after the end of a superstep.
>> One solution to your problem might be to create a new random key for each
>> element that you want to add to the solution set. Would that be possible?
>>
>> Cheers,
>> V.
>>
>>
>>
>>
>>
>> On 7 July 2014 10:53, Fabian Hueske <[email protected]> wrote:
>>
>> > Hi Jake,
>> >
>> > the problem in the other thread was that, for testing purposes, a
>> > collection data source was used as delta set. So the working set and the
>> > delta set were not connected at all.
>> >
>> > In your program this is the case, though the connection is through a
>> > broadcast set which is not detected by the compiler.
>> >
>> > I am not very familiar with the iterations code, so I leave the tricky
>> > questions for somebody who knows the details.
>> >
>> > Best, Fabian
>> >
>> >
>> >
>> >
>> > 2014-07-06 10:38 GMT+02:00 Jack David Galilee <
>> [email protected]
>> > >:
>> >
>> > > Hi,
>> > >
>> > >
>> > > I know a similar problem to this was raised earlier last month from
>> the
>> > > archive (
>> > >
>> http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser).
>> > > However, I am unable to see if this was ever solved.
>> > >
>> > >
>> > > I am encountering the same problem "In the given plan, the solution
>> set
>> > > delta does not depend on the workset.", but what I can't ascertain
>> > (having
>> > > examined the PACT compiler (0.5.1)) is whether this is a bug or an
>> > > intentional design constraint placed on the delta iteration operator.
>> > >
>> > >
>> > > My algorithm sits between the Delta and Bulk iterative models as an
>> > > Incremental iterative algorithm. The solution set is the union of all
>> > > working sets up until the current working set is empty.
>> > >
>> > >
>> > > The working set is broadcast to a single operator in the data-flow.
>> This
>> > > appears to be the problem, the compiler is unable to determine the
>> > > dependency via this broadcast.
>> > >
>> > >
>> > > To make things more complex my data does not suit the
>> pseudo-relational
>> > > model Flink is designed around. I am dealing with variable length
>> sets /
>> > > arrays so I can't join against the solution set, or working set
>> between
>> > > iterations because the data has no notion of keys.
>> > >
>> > >
>> > > I can make it 'run' as a BulkIteration, but the result is the final
>> state
>> > > (the empty working set) as at least the 0.5.1 API doesn't allow all
>> > > previous steps to be captured in a union - I essentially lose the
>> answer
>> > > once the algorithm converges.
>> > >
>> > >
>> > > Your opinion as to whether this is actually a bug, or if I am doing it
>> > all
>> > > completely wrong would be most appreciated.
>> > >
>> > >
>> > >
>> > > Cheers,
>> > >
>> > > Jack Galilee
>> > >
>> >
>>
>
>

Reply via email to