Addendum: The issue to enforce partitioning of the data set is tracked
here: https://issues.apache.org/jira/browse/FLINK-1060


On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <se...@apache.org> wrote:

> Hey!
>
> Sorry for the late reply, but here are some thoughts in that direction:
>
>
> Stateful operations:
>
> In order to keep state (in the form of a hash map or so) around in
> iterations, you need not use the restricted means that the delta iterations
> give you. All function instances stay around for all supersteps, so they
> can actually just create a map inside that function and update it as you
> like.
>
> There is some example code at the end of the mail (listing 1) or nicely
> formatted) in this gist:
> https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09
>
>
> State across functions:
>
> If you actually want to have state that you access from multiple
> functions, that is also possible. Use a static broker to fetch the map,
> like in listing 2 or this gist:
> https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328
>
>
> Making sure data is partitioned:
>
> The more tricky part is making sure that the data is partitioned when it
> enters the functions. Right now, this needs a hack: A GroupReduce function
> that simply emits everything. This will cause an unnecessary and costly
> sort, so we need to get that out of the way. Until then, it should allow
> you to write a prototype.
>
> The good news is that we are planning to add hints to tell the system to
> partition/rebalance/etc data sets for operations. This would solve the
> issue nicely. It is not a terribly large change, so it should not take too
> long. I'll keep you posted on the progress...
>
> Greetings,
> Stephan
>
>
> ---------------------------------------------------------
> Sample 1: Flexible State in a Mapper
> ---------------------------------------------------------
>
> public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> {
>
>       private final Map<Long, Edge> state = new HashMap<Long, Edge>();
>
>       @Override
>       public void open(Configuration conf) throws Exception {
>                               // load the state
>
>               if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
>                                               String pathToFragment = 
> "hdfs://... or file://...";
>                       CsvInputFormat<Edge> reader = new CsvInputFormat<>(new 
> Path(pathToFragment));
>                       reader.configure(new Configuration());
>                       reader.open(new FileInputSplit(0, new 
> Path(pathToFragment), 0, 36584, null));
>                                               while (!reader.reachedEnd()) {
>                               Edge next = reader.nextRecord(new Edge());
>                               state.put(next.f0, next);
>                       }
>               }
>
>       }
>
>       @Override
>
>       public void close() {
>               // check whether to write the state out
>               if (getIterationRuntimeContext().getSuperstepNumber() == 42) {
>                       // write the state (similar code as in open() for the 
> reader)
>
>                                       }
>       }
>
>
>       @Override
>       public void mapPartition(Iterable<Edge> records, Collector<Edge> out) 
> throws Exception {
>               // do something with the state
>               for (Edge e : records) {
>                       Edge other = state.get(e.f0);
>                       // do something
>                       
>                       state.put(...);
>
>               }
>       }
> }
>
>
> =======================================================
>
> ---------------------------------------------------------
> Sample 2: Sharing state across functions
> ---------------------------------------------------------
>
> public class StateBroker {
>
>   public static final ConcurrentHashMap<Integer, Map<Long, Edge>> BROKER = 
> new ConcurrentHashMap<>();
>               public static Map<Long, Edge> getForSubtask(int subtask) {
>                               Map<Long, Edge> entry = BROKER.get(subtask);
>               if (entry == null) {
>                       entry = new HashMap<>();
>                       Map<Long, Edge> previous = BROKER.putIfAbsent(subtask, 
> entry);
>                       entry = previous == null ? entry : previous;
>               }
>                               return entry;
>       }
> }
>
>
> public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> {
>
>       private Map<Long, Edge> state;
>
>       @Override
>       public void open(Configuration conf) throws Exception {
>                               // load the state
>
>               if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
>                       state = 
> StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask());
>               }
>       }
>
>               ...
> }
>
>
>
> On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri <
> vasilikikala...@gmail.com> wrote:
>
>> Hey,
>>
>> thanks for replying so fast :)
>>
>> I saw the discussion in a previous thread concerning changing the API to
>> offer more explicit join functions.
>> I think providing these special functions is a good way to disable any
>> other kind of interaction with the solution set.
>>
>> However, as a user, I would like to have a more flexible way of
>> interacting
>> with the state of the iteration.
>> In the program I'm describing above, I actually want to join on the value
>> of the solution set, not the key.
>> It would be nice to somehow have access to the solution set as any other
>> normal DataSet.
>>
>> I'm not sure how this could be supported, but if you think this is a good
>> idea, I could work on this!
>>
>> Cheers,
>> V.
>>
>>
>>
>> On 31 July 2014 15:50, Stephan Ewen <se...@apache.org> wrote:
>>
>> > Hi Vasia!
>> >
>> > There is no fundamental reason, we simply have not gotten around to
>> > implementing it, yet. Any help along these lines is highly welcome.
>> >
>> > One reason that held us back is that we need to make sure that the key
>> of
>> > the solution set and the key of the join is the same.
>> > That is hard to verify with general functions. One approach is to
>> actually
>> > change the delta iteration API to define the keys only at
>> > one place (the definition of the iteration), and offer special
>> > "joinWithSolution" and "coGroupWithSolution" functions, rather then
>> using
>> > the regular join syntax (which allows to create invalid constructs).
>> >
>> > What are your thoughts on this, from a DeltaIteration user perspective?
>> >
>> > Greetings,
>> > Stephan
>> >
>>
>
>

Reply via email to