[ 
https://issues.apache.org/jira/browse/FLINK-20038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17232495#comment-17232495
 ] 

Jin Xing edited comment on FLINK-20038 at 11/16/20, 3:56 AM:
-------------------------------------------------------------

Hi [~trohrmann] [~ym] 
 Thanks a lot for your feedback and sorry for late reply, was busy during 11.11 
shopping festival support ~

We indeed need a proper design for what we want to support and how it could be 
mapped to properties. The characteristics of shuffle manner is exposed by 
ResultPartitionType.

We should sort out what should be exposed to scheduling layer, which should be 
respected;  and what are implementation details, which should be kept inside;

>From my side, I think there are 3 kinds of properties should be exposed to 
>scheduling layer:
 # *_+Writing property+_* – – how the shuffle service is writable, that's what 
'_hasBackpressure_' indicates, but seems that it's used nowhere and blurred 
with the property of 'isPipeliened'. In current code, Flink assumes that if a 
shuffle is '_isPipiplined=true_', it's always '_hasBackpressure=true_', which 
is not true from the concept.
 # *_+Reading property+_* – – when the shuffle data is readable, that's what 
'isPipelined' and '_isBlocking_' indicates. They are used when dividing 
PipelinedRegion. My concern is how we define the meaning  of a PIpelinedRegion 
in short. From my understanding, a PipelinedRegion is a set of vertices which 
should be scheduled together because of back pressure and the internal data 
flow can be consumed before task finish. If my understanding is correct, when 
judge whether two vertices should be divided into the same region, should the 
condition be '_isPipeliend=true && hasBackpressure=true_', rather than the 
current impl of ([PipelinedRegionComputeUtil|#L158])]  
(_producedResult.getResultType().isPipelined()_) ?
 # *+_Data lifecycle_+* – –  GC could happen a). after data consumption; b). 
after job finished; c) by recycle manually

If above classification is valid, we need to rectify some misuse in scheduling 
layer and give full respect to shuffle property.

 


was (Author: jinxing6...@126.com):
Hi [~trohrmann] [~ym] 
 Thanks a lot for your feedback and sorry for late reply, was busy during 11.11 
shopping festival support ~

We indeed need a proper design for what we want to support and how it could be 
mapped to properties. The characteristics of shuffle manner is exposed by 
ResultPartitionType.

We should sort out what should be exposed to scheduling layer, which should be 
respected;  and what are implementation details, which should be kept inside;

>From my side, I think there are 3 kinds of properties should be exposed to 
>scheduling layer:
 # *_+Writing property+_* – – when the shuffle service is writable, that's what 
'_hasBackpressure_' indicates, but seems that it's used nowhere and blurred 
with the property of 'isPipeliened'. In current code, Flink assumes that if a 
shuffle is '_isPipiplined=true_', it's always '_hasBackpressure=true_', which 
is not true from the concept.
 # *_+Reading property+_* – – when the shuffle data is readable, that's what 
'isPipelined' and '_isBlocking_' indicates. They are used when dividing 
PipelinedRegion. My concern is how we define the meaning  of a PIpelinedRegion 
in short. From my understanding, a PipelinedRegion is a set of vertices which 
should be scheduled together because of back pressure and the internal data 
flow can be consumed before task finish. If my understanding is correct, when 
judge whether two vertices should be divided into the same region, should the 
condition be '_isPipeliend=true && hasBackpressure=true_', rather than the 
current impl of ([PipelinedRegionComputeUtil|#L158])]  
(_producedResult.getResultType().isPipelined()_) ?
 # *+_Data lifecycle_+* – –  GC could happen a). after data consumption; b). 
after job finished; c) by recycle manually

If above classification is valid, we need to rectify some misuse in scheduling 
layer and give full respect to shuffle property.

 

> Rectify the usage of ResultPartitionType#isPipelined() in partition tracker.
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-20038
>                 URL: https://issues.apache.org/jira/browse/FLINK-20038
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination, Runtime / Network
>            Reporter: Jin Xing
>            Priority: Major
>
> After "FLIP-31: Pluggable Shuffle Service", users can extend and plug in new 
> shuffle manner, thus to benefit different scenarios. New shuffle manner tend 
> to bring in new abilities which could be leveraged by scheduling layer to 
> provide better performance.
> From my understanding, the characteristics of shuffle manner is exposed by 
> ResultPartitionType (e.g. isPipelined, isBlocking, hasBackPressure ...), and 
> leveraged by scheduling layer to conduct job. But seems that Flink doesn't 
> provide a way to describe the new characteristics from a plugged in shuffle 
> manner. I also find that scheduling layer have some weak assumptions on 
> ResultPartitionType. I detail by below example.
> In our internal Flink, we develop a new shuffle manner for batch jobs. 
> Characteristics can be summarized as below briefly:
> 1. Upstream task shuffle writes data to DISK;
> 2. Upstream task commits data while producing and notify "consumable" to 
> downstream BEFORE task finished;
> 3. Downstream is notified when upstream data is consumable and can be 
> scheduled according to resource;
> 4. When downstream task failover, only itself needs to be restarted because 
> upstream data is written into disk and replayable;
> We can tell the character of this new shuffle manner as:
> a. isPipelined=true – downstream task can consume data before upstream 
> finished;
> b. hasBackPressure=false – upstream task shuffle writes data to disk and can 
> finish by itself no matter if there's downstream task consumes the data in 
> time.
> But above new ResultPartitionType(isPipelined=true, hasBackPressure=false) 
> seems contradicts the partition lifecycle management in current scheduling 
> layer:
> 1. The above new shuffle manner needs partition tracker for lifecycle 
> management, but current Flink assumes that ALL "isPipelined=true" result 
> partitions are released on consumption and will not be taken care of by 
> partition tracker 
> ([link|https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java#L66])
>  – the limitation is not correct for this case.
> From my understanding, the method of ResultPartitionType#isPipelined() 
> indicates whether data can be consumed while being produced, and it's 
> orthogonal to whether the partition is released on consumption. I propose to 
> have a fix on this and fully respect to the original meaning of 
> ResultPartitionType#isPipelined().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to