I think that this is a fairly delicate thing.

The execution graph / scheduling is the most delicate part of the system. I
would not feel too well about a quick fix there, so let's think this
through a little bit.

The logic currently does the following:

1) It schedules the sources (see "ExecutionGaph.scheduleForExecution()")

2) Successors of operators are scheduled when the intermediate result
partition / queue tells the master that data is available.

3) The successor requests the stream from the producer.


Possible changes:
 - We could definitely change the "ExecutionGraph.scheduleForExecution()"
method to deploy all tasks immediately. I would suggest to have a "schedule
mode" attached to the JobGraph that defines how to do that. The mode could
have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do
right now, backtracking is what we will do in the next release, ALL is what
you need)

 - When all tasks are scheduled immediately, it may be that for a channel,
the sender is not yet deployed when the receiver is deployed. That should
be okay, since the same can happen right now when all-to-all patterns
connect the tasks.

 - The queues would still send notifications to the JobManager that data is
available, but the JM will see that the target task is already deployed (or
currently being deployed). Then the info where to grab a channel from would
need to be sent to the task. That mechanism also exists already.


@Ufuk: It seems that it may actually work to simply kick off the deployment
of all tasks immediately (in the ExecutionGraph.scheduleForExecution()"
method). Do you see any other implications?

Greetings,
Stephan


On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Gyula,
>
> On 21 Jan 2015, at 15:41, Gyula Fóra <gyf...@apache.org> wrote:
>
> > Hey Guys,
> >
> > I think it would make sense to turn lazy operator execution off for
> > streaming programs because it would make life simpler for windowing.  I
> > also created a JIRA issue here
> > <https://issues.apache.org/jira/browse/FLINK-1425>.
> >
> > Can anyone give me some quick pointers how to do this? Its probably
> simple,
> > I am just not familiar with that part of the code. (Or maybe its so easy
> > that someone could pick this up :) )
>
> Have a look at the JobManager ScheduleOrUpdateConsumers message, which is
> how it is done currently. The first produced buffer of an intermediate
> results triggers this message. I think the cleanest solution would be to do
> this directly when scheduling a streaming job?
>
> > By the way, do you see any reasons why we should not do this?
>
> ATM, I don't.

Reply via email to