Hi Fabian,

I see, thank's for the quick explanation.

Cheers,

Konstantin


On 04.01.2017 14:15, Fabian Hueske wrote:
> Hi Konstantin,
> 
> the DataSet API tries to execute all operators as soon as possible.
> 
> I assume that in your case, Flink does not do this because it tries to
> avoid a deadlock.
> A dataflow which replicates data from the same source and joins it again
> might get deadlocked because all pipelines need to make progress in
> order to finish the source.
> 
> Think of a simple example like this:
> 
>            /-- Map1 --\
> Src --<                  >-Join
>            \-- Map2 --/
> 
> If the join is executed as a hash join, one input (Map1) is used to
> build a hash table. Only once the hash table is built, the other input
> (Map2) can be consumed.
> If both Map operators would run at the same time, Map2 would stall at
> some point because it cannot emit anymore data due to the backpressure
> of the not-yet-opened probe input of the hash join.
> Once Map2 stalls, the Source would stall and Map1 could not continue to
> finish the build side. At this point we have a deadlock.
> 
> Flink detects these situations and adds an artificial pipeline breaker
> in the dataflow to prevent deadlocks. Due to the pipeline breaker, the
> build side is completed before the probe side input is processed.
> 
> This also answers the question, which operator is executed first: the
> operator on the build side of the first join. Hence the join strategy of
> the optimizer (BUILD_FIRST, BUILD_SECONS) decides.
> You can also give a manual JoinHint to control that. If you give a
> SORT_MERGE hint, all three operators should run concurrently because
> both join input will be concurrently consumed for sorting.
> 
> Best, Fabian
> 
> 
> 2017-01-04 13:30 GMT+01:00 Konstantin Knauf
> <konstantin.kn...@tngtech.com <mailto:konstantin.kn...@tngtech.com>>:
> 
>     Hi everyone,
> 
>     I have a basic question regarding scheduling of batch programs. Let's
>     take the following graph:
> 
>               -> Group Combine -> ...
>             /
>     Source ----> Group Combine -> ...
>             \
>               -> Map -> ...
> 
>     So, a source and followed by three operators with ship strategy
>     "Forward" and exchange mode "pipelined".
> 
>     The three flows are later joined again, so that this results in a single
>     job.
> 
>     When the job is started, first, only one of the operators immediately
>     receive the input read by the source and can therefore run concurrently
>     with the source. Once the source is finished, the other two operators
>     are scheduled.
> 
>     Two questions about this:
> 
>     1) Why doesn't the source forward the records to all three operators
>     while still running?
>     2) How does the jobmanager decide, which of the three operators
>     receivese the pipelined data first?
> 
>     Cheers and Thanks,
> 
>     Konstantin
> 
> 
>     --
>     Konstantin Knauf * konstantin.kn...@tngtech.com
>     <mailto:konstantin.kn...@tngtech.com> * +49-174-3413182
>     <tel:%2B49-174-3413182>
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
> 
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to