Hi Dmitriy, I’m not sure whether I’ve understood your question correctly, so please correct me if I’m wrong.
So you’re asking whether it is a problem that stat1 = A.map.reduce A = A.update.map(stat1) are executed on the same input data set A and whether we have to cache A for that, right? I assume you’re worried that A is calculated twice. Since you don’t have a API call which triggers eager execution of the data flow, the map.reduce and map(stat1) call will only construct the data flow of your program. Both operators will depend on the result of A which is only once calculated (when execute, collect or count is called) and then sent to the map.reduce and map(stat1) operator. However, it is not recommended using an explicit loop to do iterative computations with Flink. The problem here is that you will basically unroll the loop and construct a long pipeline with the operations of each iterations. Once you execute this long pipeline you will face considerable memory fragmentation, because every operator will get a proportional fraction of the available memory assigned. Even worse, if you trigger the execution of your data flow to evaluate the convergence criterion, you will execute for each iteration the complete pipeline which has been built up so far. Thus, you’ll end up with a quadratic complexity in the number of iterations. Therefore, I would highly recommend using Flink’s built in support for native iterations which won’t suffer from this problem or to materialize at least for every n iterations the intermediate result. At the moment this would mean to write the data to some sink and then reading it from there again. I hope this answers your question. If not, then don’t hesitate to ask me again. Cheers, Till On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis < theodoros.vasilou...@gmail.com> wrote: > Hello Dmitriy, > > If I understood correctly what you are basically talking about modifying a > DataSet as you iterate over it. > > AFAIK this is currently not possible in Flink, and indeed it's a real > bottleneck for ML algorithms. This is the reason our current > SGD implementation does a pass over the whole dataset at each iteration, > since we cannot take a sample from the dataset > and iterate only over that (so it's not really stochastic). > > The relevant JIRA is here: > https://issues.apache.org/jira/browse/FLINK-2396 > > I would love to start a discussion on how we can proceed to fix this. > > Regards, > Theodore > > On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com> > wrote: > > > Hi, > > > > probably more of a question for Till: > > > > Imagine a common ML algorithm flow that runs until convergence. > > > > typical distributed flow would be something like that (e.g. GMM EM would > be > > exactly like that): > > > > A: input > > > > do { > > > > stat1 = A.map.reduce > > A = A.update-map(stat1) > > conv = A.map.reduce > > } until conv > convThreshold > > > > There probably could be 1 map-reduce step originating on A to compute > both > > convergence criteria statistics and udpate statistics in one step. not > the > > point. > > > > The point is that update and map.reduce originate on the same dataset > > intermittently. > > > > In spark we would normally commit A to a object tree cache so that data > is > > available to subsequent map passes without any I/O or serialization > > operations, thus insuring high rate of iterations. > > > > We observe the same pattern pretty much everywhere. clustering, > > probabilistic algorithms, even batch gradient descent of quasi newton > > algorithms fitting. > > > > How do we do something like that, for example, in FlinkML? > > > > Thoughts? > > > > thanks. > > > > -Dmitriy > > >