Hi Dmitriy,

I’m not sure whether I’ve understood your question correctly, so please
correct me if I’m wrong.

So you’re asking whether it is a problem that

stat1 = A.map.reduce
A = A.update.map(stat1)

are executed on the same input data set A and whether we have to cache A
for that, right? I assume you’re worried that A is calculated twice.

Since you don’t have a API call which triggers eager execution of the data
flow, the map.reduce and map(stat1) call will only construct the data flow
of your program. Both operators will depend on the result of A which is
only once calculated (when execute, collect or count is called) and then
sent to the map.reduce and map(stat1) operator.

However, it is not recommended using an explicit loop to do iterative
computations with Flink. The problem here is that you will basically unroll
the loop and construct a long pipeline with the operations of each
iterations. Once you execute this long pipeline you will face considerable
memory fragmentation, because every operator will get a proportional
fraction of the available memory assigned. Even worse, if you trigger the
execution of your data flow to evaluate the convergence criterion, you will
execute for each iteration the complete pipeline which has been built up so
far. Thus, you’ll end up with a quadratic complexity in the number of
iterations. Therefore, I would highly recommend using Flink’s built in
support for native iterations which won’t suffer from this problem or to
materialize at least for every n iterations the intermediate result. At the
moment this would mean to write the data to some sink and then reading it
from there again.

I hope this answers your question. If not, then don’t hesitate to ask me
again.

Cheers,
Till
​

On Wed, Mar 23, 2016 at 10:19 AM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> Hello Dmitriy,
>
> If I understood correctly what you are basically talking about modifying a
> DataSet as you iterate over it.
>
> AFAIK this is currently not possible in Flink, and indeed it's a real
> bottleneck for ML algorithms. This is the reason our current
> SGD implementation does a pass over the whole dataset at each iteration,
> since we cannot take a sample from the dataset
> and iterate only over that (so it's not really stochastic).
>
> The relevant JIRA is here:
> https://issues.apache.org/jira/browse/FLINK-2396
>
> I would love to start a discussion on how we can proceed to fix this.
>
> Regards,
> Theodore
>
> On Tue, Mar 22, 2016 at 9:56 PM, Dmitriy Lyubimov <dlie...@gmail.com>
> wrote:
>
> > Hi,
> >
> > probably more of a question for Till:
> >
> > Imagine a common ML algorithm flow that runs until convergence.
> >
> > typical distributed flow would be something like that (e.g. GMM EM would
> be
> > exactly like that):
> >
> > A: input
> >
> > do {
> >
> >    stat1 = A.map.reduce
> >    A = A.update-map(stat1)
> >    conv = A.map.reduce
> > } until conv > convThreshold
> >
> > There probably could be 1 map-reduce step originating on A to compute
> both
> > convergence criteria statistics and udpate statistics in one step. not
> the
> > point.
> >
> > The point is that update and map.reduce originate on the same dataset
> > intermittently.
> >
> > In spark we would normally commit A to a object tree cache so that data
> is
> > available to subsequent map passes without any I/O or serialization
> > operations, thus insuring high rate of iterations.
> >
> > We observe the same pattern pretty much everywhere. clustering,
> > probabilistic algorithms, even batch gradient descent of quasi newton
> > algorithms fitting.
> >
> > How do we do something like that, for example, in FlinkML?
> >
> > Thoughts?
> >
> > thanks.
> >
> > -Dmitriy
> >
>

Reply via email to