Addendum: The issue to enforce partitioning of the data set is tracked here: https://issues.apache.org/jira/browse/FLINK-1060
On Sat, Aug 30, 2014 at 6:01 PM, Stephan Ewen <se...@apache.org> wrote: > Hey! > > Sorry for the late reply, but here are some thoughts in that direction: > > > Stateful operations: > > In order to keep state (in the form of a hash map or so) around in > iterations, you need not use the restricted means that the delta iterations > give you. All function instances stay around for all supersteps, so they > can actually just create a map inside that function and update it as you > like. > > There is some example code at the end of the mail (listing 1) or nicely > formatted) in this gist: > https://gist.github.com/StephanEwen/7815c1f269f1f79b8e09 > > > State across functions: > > If you actually want to have state that you access from multiple > functions, that is also possible. Use a static broker to fetch the map, > like in listing 2 or this gist: > https://gist.github.com/StephanEwen/5cdfa628d0e05b99f328 > > > Making sure data is partitioned: > > The more tricky part is making sure that the data is partitioned when it > enters the functions. Right now, this needs a hack: A GroupReduce function > that simply emits everything. This will cause an unnecessary and costly > sort, so we need to get that out of the way. Until then, it should allow > you to write a prototype. > > The good news is that we are planning to add hints to tell the system to > partition/rebalance/etc data sets for operations. This would solve the > issue nicely. It is not a terribly large change, so it should not take too > long. I'll keep you posted on the progress... > > Greetings, > Stephan > > > --------------------------------------------------------- > Sample 1: Flexible State in a Mapper > --------------------------------------------------------- > > public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { > > private final Map<Long, Edge> state = new HashMap<Long, Edge>(); > > @Override > public void open(Configuration conf) throws Exception { > // load the state > > if (getIterationRuntimeContext().getSuperstepNumber() == 1) { > String pathToFragment = > "hdfs://... or file://..."; > CsvInputFormat<Edge> reader = new CsvInputFormat<>(new > Path(pathToFragment)); > reader.configure(new Configuration()); > reader.open(new FileInputSplit(0, new > Path(pathToFragment), 0, 36584, null)); > while (!reader.reachedEnd()) { > Edge next = reader.nextRecord(new Edge()); > state.put(next.f0, next); > } > } > > } > > @Override > > public void close() { > // check whether to write the state out > if (getIterationRuntimeContext().getSuperstepNumber() == 42) { > // write the state (similar code as in open() for the > reader) > > } > } > > > @Override > public void mapPartition(Iterable<Edge> records, Collector<Edge> out) > throws Exception { > // do something with the state > for (Edge e : records) { > Edge other = state.get(e.f0); > // do something > > state.put(...); > > } > } > } > > > ======================================================= > > --------------------------------------------------------- > Sample 2: Sharing state across functions > --------------------------------------------------------- > > public class StateBroker { > > public static final ConcurrentHashMap<Integer, Map<Long, Edge>> BROKER = > new ConcurrentHashMap<>(); > public static Map<Long, Edge> getForSubtask(int subtask) { > Map<Long, Edge> entry = BROKER.get(subtask); > if (entry == null) { > entry = new HashMap<>(); > Map<Long, Edge> previous = BROKER.putIfAbsent(subtask, > entry); > entry = previous == null ? entry : previous; > } > return entry; > } > } > > > public static class MyMapper extends RichMapPartitionFunction<Edge, Edge> { > > private Map<Long, Edge> state; > > @Override > public void open(Configuration conf) throws Exception { > // load the state > > if (getIterationRuntimeContext().getSuperstepNumber() == 1) { > state = > StateBroker.getForSubtask(getRuntimeContext().getIndexOfThisSubtask()); > } > } > > ... > } > > > > On Thu, Jul 31, 2014 at 4:15 PM, Vasiliki Kalavri < > vasilikikala...@gmail.com> wrote: > >> Hey, >> >> thanks for replying so fast :) >> >> I saw the discussion in a previous thread concerning changing the API to >> offer more explicit join functions. >> I think providing these special functions is a good way to disable any >> other kind of interaction with the solution set. >> >> However, as a user, I would like to have a more flexible way of >> interacting >> with the state of the iteration. >> In the program I'm describing above, I actually want to join on the value >> of the solution set, not the key. >> It would be nice to somehow have access to the solution set as any other >> normal DataSet. >> >> I'm not sure how this could be supported, but if you think this is a good >> idea, I could work on this! >> >> Cheers, >> V. >> >> >> >> On 31 July 2014 15:50, Stephan Ewen <se...@apache.org> wrote: >> >> > Hi Vasia! >> > >> > There is no fundamental reason, we simply have not gotten around to >> > implementing it, yet. Any help along these lines is highly welcome. >> > >> > One reason that held us back is that we need to make sure that the key >> of >> > the solution set and the key of the join is the same. >> > That is hard to verify with general functions. One approach is to >> actually >> > change the delta iteration API to define the keys only at >> > one place (the definition of the iteration), and offer special >> > "joinWithSolution" and "coGroupWithSolution" functions, rather then >> using >> > the regular join syntax (which allows to create invalid constructs). >> > >> > What are your thoughts on this, from a DeltaIteration user perspective? >> > >> > Greetings, >> > Stephan >> > >> > >