I'm not sure whether it is currently possible to schedule first the
receiver and then the sender. Recently, I had to fix the
TaskManagerTest.testRunWithForwardChannel test case where this was exactly
the case. Due to first scheduling the receiver, it happened sometimes that
an IllegalQueueIteratorRequestException in the method
IntermediateResultPartitionManager.getIntermediateResultPartitionIterator
was thrown. The partition manager complained that the producer execution ID
was unknown. I assume that this has to be fixed first in order to schedule
all task immediately. But Ufuk will probably know it better.

Greets,

Till

On Wed, Jan 21, 2015 at 8:58 PM, Stephan Ewen <[email protected]> wrote:

> I think that this is a fairly delicate thing.
>
> The execution graph / scheduling is the most delicate part of the system. I
> would not feel too well about a quick fix there, so let's think this
> through a little bit.
>
> The logic currently does the following:
>
> 1) It schedules the sources (see "ExecutionGaph.scheduleForExecution()")
>
> 2) Successors of operators are scheduled when the intermediate result
> partition / queue tells the master that data is available.
>
> 3) The successor requests the stream from the producer.
>
>
> Possible changes:
>  - We could definitely change the "ExecutionGraph.scheduleForExecution()"
> method to deploy all tasks immediately. I would suggest to have a "schedule
> mode" attached to the JobGraph that defines how to do that. The mode could
> have values (FROM_SOURCES, ALL, BACK_TRACKING). From sources is what we do
> right now, backtracking is what we will do in the next release, ALL is what
> you need)
>
>  - When all tasks are scheduled immediately, it may be that for a channel,
> the sender is not yet deployed when the receiver is deployed. That should
> be okay, since the same can happen right now when all-to-all patterns
> connect the tasks.
>
>  - The queues would still send notifications to the JobManager that data is
> available, but the JM will see that the target task is already deployed (or
> currently being deployed). Then the info where to grab a channel from would
> need to be sent to the task. That mechanism also exists already.
>
>
> @Ufuk: It seems that it may actually work to simply kick off the deployment
> of all tasks immediately (in the ExecutionGraph.scheduleForExecution()"
> method). Do you see any other implications?
>
> Greetings,
> Stephan
>
>
> On Wed, Jan 21, 2015 at 6:50 AM, Ufuk Celebi <[email protected]> wrote:
>
> > Hey Gyula,
> >
> > On 21 Jan 2015, at 15:41, Gyula Fóra <[email protected]> wrote:
> >
> > > Hey Guys,
> > >
> > > I think it would make sense to turn lazy operator execution off for
> > > streaming programs because it would make life simpler for windowing.  I
> > > also created a JIRA issue here
> > > <https://issues.apache.org/jira/browse/FLINK-1425>.
> > >
> > > Can anyone give me some quick pointers how to do this? Its probably
> > simple,
> > > I am just not familiar with that part of the code. (Or maybe its so
> easy
> > > that someone could pick this up :) )
> >
> > Have a look at the JobManager ScheduleOrUpdateConsumers message, which is
> > how it is done currently. The first produced buffer of an intermediate
> > results triggers this message. I think the cleanest solution would be to
> do
> > this directly when scheduling a streaming job?
> >
> > > By the way, do you see any reasons why we should not do this?
> >
> > ATM, I don't.
>

Reply via email to