Hi Zhijiang,

I see there’s a YarnShuffleService in newly released Blink branch. Is there any 
relationship between that YarnShuffleService and  your external shuffle service?

Regards,
Qi

> On Jan 28, 2019, at 8:07 PM, zhijiang <wangzhijiang...@aliyun.com.INVALID> 
> wrote:
> 
> Hi till,
> 
> Very glad to receive your feedbacks and it is atually very helpful.
> 
> The proposed ShuffleMaster in JM would be involved in many existing 
> processes, such as task deployment, task failover, TM release, so it might be 
> interactive with corresponding Scheduler, FailoverStrategy, SlotPool 
> components. In the first version we try to focus on deploying process which 
> is described in detail in the FLIP. Concerning the other improvements based 
> on the proposed architecuture, we just mentioned the basic ideas and have not 
> given the whole detail process. But I think it is reasonable and natural to 
> solve these issues based on that. And we would further give more details for 
> other future steps.
> 
> I totally agree with your thought of handling TM release. Currently once the 
> task is finished, the corresponding slot is regared as free no matter whether 
> the produced partition is consumed or not. Actually we could think both task 
> and its partitionsoccupy resources in slot. So the slot can be regared as 
> free until the internal partition is consumed and released. Then the TM 
> release logic is also improved meanwhile. I think your suggestions below 
> already gives the detail and specific process for this improvement.
> 
> I am in favor of launching a separate thread for this discussion again, 
> thanks for the advice!
> 
> Best,
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> From:Till Rohrmann <trohrm...@apache.org>
> Send Time:2019年1月28日(星期一) 19:14
> To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> Cc:Andrey Zagrebin <and...@da-platform.com>
> Subject:Re: [DISCUSS] Proposal of external shuffle service
> 
> Thanks for creating the FLIP-31 for the external shuffle service Zhijiang. It 
> looks good to me. 
> 
> One thing which is not fully clear to me yet is how the lifecycle management 
> of the partitions integrates with the slot management. At the moment, 
> conceptually we consider the partition data being owned by the TM if I 
> understood it correctly. This means the ShuffleMaster is asked whether a TM 
> can be freed. However, the JobMaster only thinks in terms of slots and not 
> TMs. Thus, the logic would be that the JM asks the ShuffleMaster whether it 
> can return a certain slot. Atm the freeing of slots is done by the `SlotPool` 
> and, thus this would couple the `SlotPool` and the `ShuffleMaster`. Maybe we 
> need to introduce some mechanism to signal when a slot has still some 
> occupied resources. In the shared slot case, one could think of allocating a 
> dummy slot in the shared slot which we only release after the partition data 
> has been consumed.
> 
> In order to give this design document a little bit more visibility, I would 
> suggest to post it again on the dev mailing list in a separate thread under 
> the title "[DISCUSS] Flip-31: Pluggable Shuffle Manager" or something like 
> this.
> 
> Cheers,
> Till
> On Mon, Jan 21, 2019 at 7:05 AM zhijiang <wangzhijiang...@aliyun.com.invalid> 
> wrote:
> Hi all,
> 
> FYI, I created the FLIP-31 under [1] for this proposal and created some 
> subtasks under umbrella jira [2].
> Welcome any concerns in previous google doc or speific jiras.
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager
> [2] https://issues.apache.org/jira/browse/FLINK-10653
> 
> Best,
> Zhijiang
> ------------------------------------------------------------------
> From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> Send Time:2019年1月15日(星期二) 17:55
> To:Andrey Zagrebin <and...@da-platform.com>
> Cc:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] Proposal of external shuffle service
> 
> Hi all,
> 
> After continuous discussion with Andrey offline, we already reach an 
> agreement for this proposal and co-author the latest google doc under [1].
> 
> We plan to create FLIP and sub-tasks by the end of this week, and the first 
> MVP wishes to be covered in FLINK 1.8.
> 
> Welcome any feedbacks and suggestions! :)
> 
> [1] 
> https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing
> 
> Best,
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> From:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> Send Time:2018年12月25日(星期二) 15:33
> To:Andrey Zagrebin <and...@da-platform.com>
> Cc:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] Proposal of external shuffle service
> 
> Hi Andrey,
> 
> Thanks for efficient response for the UnknownShuffleDeploymentDescriptor 
> issue.
> 
> It is reasonable for considering this special case on both ShuffleMaster and 
> ShuffleService sides.
> On upstream ShuffleService side, the created ResultPartitionWriter decides 
> whether to notify ShuffleMaster of consumable partition when outputs the 
> first buffer or finishes.
> On ShuffleMaster side, it might define a method in ShuffleMaster interface 
> for handling this notification message from upstream side, and then 
> internally decide whether to update partition info for downstream sides or 
> not.
> On downstream ShuffleService side, it might define a method in ShuffleService 
> interface to handle the update partition info message from ShuffleMaster, 
> then it can find the corresponding created InputGate to update.
> The communication between ShuffleService and ShuffleMaster can make use of 
> TMGateway & JMGateway for current implementation. Certainly it can also rely 
> on other ways for different ShuffleManager implementations. I would update 
> the google doc to make this process clear if you also think so. :)
> 
> Best,
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> From:Andrey Zagrebin <and...@da-platform.com>
> Send Time:2018年12月25日(星期二) 02:32
> To:zhijiang <wangzhijiang...@aliyun.com>
> Cc:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] Proposal of external shuffle service
> 
> Hi Zhijiang,
> 
> Thanks for considering my thoughts and concerns. Those are just suggestions 
> for your design document.
> 
> My understanding about 2.1 was initially that shuffle service is also treated 
> as unknown in case of UnknownShuffleDeploymentDescriptor which is not quite 
> true.
> Thinking about it more, it might be actually up to shuffle service to decide 
> how to react on the events of producer or consumer deployment.
> Maybe, ShuffleMaster could have two register/deregister methods for input and 
> output (now partition) and/or also task state update method to communicate 
> status of ShuffleService running in TM.
> Internally shuffle implementation could decide how to communicate between 
> ShuffleMaster and ShuffleService. If shuffle is channel-based it can behave 
> in a similar way as now.
> I agree it probably needs more discussion and refactoring could be planned 
> step by step if it is too involving change.
> 
> Best,
> Andrey
> 
> On Mon, Dec 24, 2018 at 11:31 AM zhijiang <wangzhijiang...@aliyun.com> wrote:
> Hi Andrey,
> 
> Thanks for further research on this topic and providing very helpful 
> summaries.  
> 
> As we discussed before, I really like the idea of dividing two separate 
> components on both JM and TM sides.
> 
> 1. On JM side, the ShuffleMaster componenet created from ShuffleManager can 
> manage and handle partition related issues properly.
> 
> 1.1 The introduction of PartitionShuffleDescriptor and 
> PartitiondDeploymentDescriptor is suitable for covering all the necessary 
> infos related with partition during deployment process and other future 
> extensions. The form of this new descriptor is also consistent with existing 
> ResultPartitionDeploymentDescriptor and InputGateDeploymentDescriptor.
> 
> 2. On TM side, the ShuffleService component created from ShuffleManager is a 
> TM level service, which can be used for creating ResultPartitionWriter and 
> InputGate during task deployment.
> 
> 
> 2.1 Concerning of updating UnknownShuffleDeploymentDescriptor,  I think it 
> may bring an argument that whether the ShuffleService should provide a 
> separate method for updating it or not. In other words, because the InputGate 
> is created by ShuffleService, then whether all the possible operations for 
> InputGate such as update or release should be handled via ShuffleService? I 
> think it can be interpreted to operate InputGate directly if the update or 
> release is general for all the ShuffleService implementations. But the 
> InputGate interface should provide the explicit methods for releasing itself 
> and updating input channels to make the whole process work.
> 
> 2.2 In addition, some implementation details can be further confirmed in 
> separate JIRAs,  such as whether we need task info related parameters during 
> creating writer, and how to extract necessary components from current 
> NetworkEnvrironment to wrap in specific ShuffleService implementation, etc.
> 
> 3. For the points mentioned in future extensions, I agree with your analysis. 
> We can focus on them separately step by step in different priorities. The 
> above ShuffleMaster provides a basic precondition for decoupling the life 
> cycles between partition state and task state. Then we can further extend the 
> methods in ShuffleMaster to know whether the partition is still available for 
> speeding up failover, and whether the partition is consumed by downstream to 
> decide when to release TM or clean partition, etc. It is also a good idea to 
> further refactor the interfaces on writer and reader sides to fine-grained 
> handle raw record instead of Buffer. And it would be involved in more changes 
> in current RecordWriter/StreamInputProcessor.
> 
> I think we can further confirm the above 2.1 issue, then I would adjust the 
> google doc based on our conclusions which cover not only the first step, but 
> also all the future extensions described and listed in priority. 
> BTW, do you think it is necessary that we further co-author a FLIP for this 
> feature? It is actually involved in many changes on both TM, JM sides.  :)
> 
> Best,
> Zhijiang
> 
> 
> 
> ------------------------------------------------------------------
> From:Andrey Zagrebin <and...@data-artisans.com>
> Send Time:2018年12月20日(星期四) 01:20
> To:zhijiang <wangzhijiang...@aliyun.com>
> Cc:dev <dev@flink.apache.org>
> Subject:Re: [DISCUSS] Proposal of external shuffle service
> 
> Hi Zhijiang,
> 
> Thanks for detailed answers! I am glad we are on the same page.
> 
> I spent some time on thinking more about our concerns and decided to make 
> more suggestions for the discussion.
> At the end, I also gathered some points related to possible extensions of 
> shuffle API to verify that the final pluggable design can support them later 
> with less changes.
> 
> 
> It might make sense for shuffle implementation to have component running on 
> both JM and TM sides.
> JM has a global view of what is happening and can interact with shuffling 
> system independently on whether tasks are running or not. The component 
> services could internally further communicate to each other outside of 
> existing JM/TM APIs, depending on shuffle implementation.
> It could help later with partition global life cycle management and cleanup.
> Moreover, if we decide to use some ShuffleDeploymentDescriptor instead of 
> ResultPartitionLocation or factories to instantiate Readers and Writers, they 
> can be created in Task Executor. 
> JM is probably not interested in this concern. ShuffleDeploymentDescriptor 
> can be specific to shuffle implementation, like factories, and contain 
> specific shuffle config for task side.
> 1. Configuration:
> interface ShuffleManager {
>   ShuffleMaster createMaster(Configuration flinkConfig);
>   ShuffleService createService(Configuration flinkConfig);
> }
> ShuffleManager is a factory for ShuffleMaster (JM side) and ShuffleService 
> (TM side).
> Flink config could also contain specific shuffle configuration, like port etc.
> Class which implements ShuffleManager in Flink cluster config, default is 
> what we have now (can be the first step)
> 2. Job master side
> class PartitionShuffleDescriptor {
>   JobID, ExecutionAttemptID, ResultPartitionType, ResultPartitionLocation, 
> TaskManagerLocation, etc
>   later possibly ShuffleType/Descriptor to choose from available shuffle 
> implementations
> }
> PartitionShuffleDescriptor contains all abstract information which JM can 
> provide from the job/execution graph.
> ResultPartitionType and ResultPartitionLocation are derived from graph and 
> execution mode, 
> so I think they are rather general parameters for any shuffle service and do 
> not belong to particular shuffle implementation.
> interface ShuffleMaster extends AutoClosable {
>   ShuffleDeploymentDescriptor registerPartition(PartitionShuffleDescriptor);
>   void deregisterPartition(PartionShuffleDescriptor);
> }
> JM process creates ShuffleMaster from configured per cluster ShuffleManager. 
> JM is responsible for its life cycle.
> ShuffleMaster is a global manager for partitions.
> JM creates PartitionShuffleDescriptor and uses ShuffleMaster to register 
> partition, e.g. when producer is deployed. 
> ShuffleMaster transforms abstract PartitionShuffleDescriptor into a specific 
> ShuffleDeploymentDescriptor.
> ShuffleDeploymentDescriptor is put into ResultPartitionDeploymentDescriptor 
> and InputGateDeploymentDescriptor.
> It can contain specific partition config for ShuffleService on TM side to 
> serve record readers and writers. 
> If it is channel-based then further break down to channel configs.
> Special UnknownShuffleDeploymentDescriptor could be used for eager deployment 
> when task input is unknown yet.
> Later, we could add an option to release partition globally by deregistering 
> it with the ShuffleMaster. e.g. to clean it up.
> 3. Task executor side
> interface ShuffleService extends AutoClosable {
>   ResultPartitionWriter 
> createResultPartitionWriter(ResultPartitionDeploymentDescriptor);
>   InputGate createInputGate(InputGateDeploymentDescriptor);
> }
> TM process creates ShuffleService from configured per cluster ShuffleManager. 
> TM is responsible for its life cycle.
> ShuffleService could substitute NetworkEnvironment in TaskManagerServices.
> 4. Later extensions
> 4.1 Per job/job edge config
> To keep jobs cluster independent, we could introduce abstract predefined 
> ShuffleType’s or descriptors
> for job developer to set it per job or job edge. The types are 
> cluster-independent.
> Cluster config could contain provided ShuffleManager implementation class for 
> each supported ShuffleType or fallback to default for some types.
> Instead of one ShuffleMaster/ShuffleService, JM/TM could have keep a registry 
> of ShuffleMaster/ShuffleService’s per ShuffleType.
> 4.2 Delay TM shutdown until all local partitions have been consumed
> JM could keep separately state of partition life cycle (e.g. in job state, 
> HA). The task executor is to shutdown (e.g. after timeout in yarn) if all its 
> tasks are done and all local partitions are consumed. If there are no local 
> partitions then it can shutdown immediately. Whether JM should check that all 
> produced by TM partitions are consumed is a feature of ShuffleManager. This 
> could be done by calling some ShuffleManager.getFeatures() interface method.
> 4.3 Speed up failover
> If partition is computed JM could reuse it as mention in fine-grained shuffle 
> system design. Whether the partition is still available after task or task 
> executor crash is also a feature of ShuffleManager.getFeatures().
> 4.4 Partition garbage collection
> When the consumer task is done, the partition is to deregister and cleanup 
> with the ShuffleMaster. 
> In case of external storage, partitions are at risk to linger after 
> job/cluster failures. The partition TTL is one option as mentioned in 
> fine-grained shuffle system design. The TTL timer could be started when there 
> is no partition access activity for certain period of time but there is 
> always risk to lose partition too early. User could try to recover failed job 
> any time later. So it might need more sophisticated approach, like manual 
> cleanup triggering (ShuffleMaster.cleanup(PartitionsInUse)) which drops all 
> currently unused partitions.
> 4.5 Shuffle Reader/Writers operation per record/byte[]/buffer
> As discussed, ResultPartitionWriter/InputGate operates on buffers with 
> serialised records data. Certain shuffle services might benefit from 
> operating per serialised records or even java objects (e.g. local channel 
> could hand over them or their copies from TypeSerializer.copy()). Record key 
> could be treated as its meta info, additionally to bytes or to user java 
> object.
> ShuffleService could be refactored later to return RecordReader/RecordWriter. 
> They could extend AbstractSerialisingRecordReader/Writer or 
> AbstractBufferingRecordReader/Writer to import current behaviour and share 
> code. This requires refactoring of StreamInputProcessor and RecordWriter to 
> extract the interfaces.
> It might be useful for ResultPartitionWriter/InputGate or 
> RecordReader/RecordWriter also to extend AutoClosable in case the internal 
> implementation needs a per task life cycle for them.
> 
> I hope it can help with the design. Feel free to give feedback.
> 
> Best,
> Andrey
> 
> On 10 Dec 2018, at 08:41, zhijiang <wangzhijiang...@aliyun.com> wrote:
> Hi Andrey,
> 
> Thanks for providing so detailed concerns and enlightenments for this 
> proposal. We exchanged our views of three main issues on google doc last week 
> and it seems more appropriate to further contact here. :)
> 
> 1. Configuration level for shuffle (cluster/job/operator)
> - how do we share shuffle manager resources among different job tasks within 
> one task executor process? It could be some static objects shared by all 
> shuffle manager objects of some type but it might be not scalable approach. 
> Example could be multiplexed netty connections (as I understand, current 
> netty stack can become just custom shuffle service).
> The creation of ShuffleManager instance on task level is just like the 
> process of creating StateBackend in StateBackendLoader. The ShuffleService 
> and ShuffleManager are two independent components, and the interaction 
> between them is only registration mechanism. In detail, if some 
> ShuffleManager instances want to rely ShuffleService to transport data, then 
> it can register related infos to ShuffleService during creation of 
> ResultPartitionWriter. So the ShuffleManager instance do not need  contain 
> any objects like netty related stacks. The flink runtime can provide one 
> unified netty-based ShuffleService which can be started in both internal 
> TaskManager or external containers. The internal ShuffleService not only 
> takes the role of tranporting data directly for some ShuffleManager instances 
> but also takes the role of RPC server for communicating with external 
> ShuffleService, such as register result partition to external service, 
> otherwise the external service might need an additional RPC service to 
> contact with TaskManager.  Here we have the implicit meaning to make intenral 
> shuffle as a basic service started in TaskManager like the components of 
> IOManager and MemoryManager, even thought useless for some type jobs.
> - In case of having it per job, we might need to provide compatibility check 
> between shuffle service and cluster mode (e.g. yarn ext shuffle service for 
> standalone mode cluster) if it is an issue.
> - Having it per job feels like the same complexity as having it per operator, 
> at the first glance, just changes its granularity and where objects reside.
> - what is the problem to use cluster per job mode? Then shuffle manager per 
> cluster and per job is the same but might simplify other issues at the 
> beginning. Streaming and batch jobs with different shuffle requirements could 
> be started in different clusters per job.
> 
> I totally agree with the above concerns for per job configuration. As you 
> mentioned, it is a option to run different type jobs in different clusters. 
> But in some special scenarios like hybrid cluster to run online and offline 
> jobs in differemt times, it is betterto support job level configuration for 
> fexibility. Certainly it may not be a strong requirements for most cases, 
> then we can reach an agreement to make the cluster level as the easiest way 
> first and adjut the level if needed in future.
> 
> 2. ShuffleManager interface
> 
> I think you mentioned three sub issues in this part:
> 
> 2.1 Introduction of additional ResultPartitionWriterFactory && 
> InputGateReaderFactory
> 
> I am not against the introduction of these two factories. The original 
> introduction of pluggable ShuffleManager interface is for creating different 
> writer and reader sides. If the ShuffleManager interface is used for creating 
> factories, and then the factories are used for creating writer and reader. I 
> still think the essence is same, and only the form is different.  That is the 
> ShuffleManager concept is seen on JobManager side, and the task only sees the 
> corresponding factories from ShuffleManager. In other words, we add another 
> factory layer to distinguish between JobManager and task. The form might seem 
> a bit better to introduce corresponding factories, so I am willing to take 
> this way for implementation.
> 
> 2.2 Whether to retain getResultPartitionLocation method in ShuffleManager 
> interface
> 
> If I understand correctly, you mean to put this location as an argument in 
> InputGateReaderFacotry constructor? If to do so, I think it makes sense and 
> we can avoid have this explicit method in interface. But we also need to 
> adjust the existing related process like updatePartitionInfo for downstream 
> side. In this case, the partition location is unknown during deploying 
> downstream tasks. Based on upstream's consumable notification, the location 
> update is triggered by JobManager to downstream side.
> 
> 2.3 ShuffleService interface
> 
> My initial thought is not making it as an interface. Because for internal or 
> external shuffle cases, they can reuse the same unified netty-based shuffle 
> service if we wrap the related componenets into current shuffle service well. 
> If we want to furtherextend other implementations of shuffle service, like 
> http-based shuffle service, then we can define an interface for it, the way 
> as current RpcService interface to get ride of only akka implementations. So 
> it also makes sense on my side to keep this interface. As for 
> ShuffleServiceRegistry class, I agree with you to have this TaskManager level 
> service for managing and sharing for all the internal tasks.
> 
> In summary, I think we do not have essential conflicts for above issues, 
> almost for the implementation aspects. And I agree with the above points, 
> especially for above 2.2 you might need double check if I understand 
> correctly. 
> Wish your further feedbacks then I can adjust the docs based on it.  Also 
> welcome any other person's feedbacks!
> 
> 
> Best,
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> 发件人:Andrey Zagrebin <and...@data-artisans.com>
> 发送时间:2018年12月10日(星期一) 05:18
> 收件人:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com>
> 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski 
> <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>; Till Rohrmann 
> <trohrm...@apache.org>
> 主 题:Re: [DISCUSS] Proposal of external shuffle service
> 
> Hi Zhijiang,
> 
> 
> Thanks for sharing the document Zhijiang. 
> I decided to compile my thoughts to consider here, not to overload document 
> comments any more :)
> I think I still have question about job level configuration for the shuffle 
> service. You mentioned that we can keep several shuffle manager objects in 
> one task executor for different jobs. This is valid. My concerns are:
> - how do we share shuffle manager resources among different job tasks within 
> one task executor process? It could be some static objects shared by all 
> shuffle manager objects of some type but it might be not scalable approach. 
> Example could be multiplexed netty connections (as I understand, current 
> netty stack can become just custom shuffle service).
> - In case of having it per job, we might need to provide compatibility check 
> between shuffle service and cluster mode (e.g. yarn ext shuffle service for 
> standalone mode cluster) if it is an issue.
> - Having it per job feels like the same complexity as having it per operator, 
> at the first glance, just changes its granularity and where objects reside.
> - what is the problem to use cluster per job mode? Then shuffle manager per 
> cluster and per job is the same but might simplify other issues at the 
> beginning. Streaming and batch jobs with different shuffle requirements could 
> be started in different clusters per job. 
> As for ShuffleManager interface, I think I see your point with the 
> ResultPartitionLocation. I agree that partition needs some addressing of 
> underlying connection or resources in general. It can be thinked of as an 
> argument of ShuffleManager factory methods.
> My point is that task code might not need to be coupled to shuffle interface. 
> This way we could keep task code more independent of records transfer layer. 
> We can always change later how shuffle/network service is organised 
> internally without any consequences for the general task code. If task code 
> calls just factories provided by JM, it might not even matter for the task in 
> future whether it is configured per cluster, job or operator. Internally, 
> factory can hold location of concrete type if needed.
> Code example could be:
> Job Manager side:
> interface ShuffleManager {
>   ResultPartionWriterFactory 
> createResultPartionWriterFactory(job/task/topology descriptors);
>   // similar for input gate factory
> }
> class ShuffleManagerImpl implements ShuffleManager {
>   private general config, services etc;
>   ResultPartionWriterFactory 
> createResultPartionWriterFactory(job/task/topology descriptors) {
>     return new ResultPartionWriterFactoryImpl(location, job, oper id, other 
> specific config etc);
>   }
>   // similar for input gate factory
> }
> ...
> // somewhere in higher level code put ResultPartionWriterFactory into 
> descriptor
> Task executor side receives the factory inside the descriptor and calls 
> factory.create(ShuffleServiceRegistry). Example of factory:
> class ResultPartionWriterFactoryImpl implements ResultPartionWriterFactory {
>   // all fields are lightweight and serialisable, received from JM
>   private location, shuffle service id, other specific config etc;
> 
>  ResultPartionWriter create(ShuffleServiceRegistry registry, maybe more 
> generic args) {
>     // get or create task local specific ShuffleServiceImpl by id in registry
>     // ShuffleServiceImpl object can be shared between jobs
>     // register with the ShuffleServiceImpl by location, id, config etc
>   }
> }
> interface ShuffleService extends AutoClosable {
>   getId();
> }
> ShuffleServiceImpl manages resources and decides internally whether to do it 
> per task executor, task, job or operator. It can contain network stack, e,g, 
> netty connections etc. In case of external service, it can hold partition 
> manager, transport client etc. It is not enforced to have it per job by this 
> contract or even to have it at all. ShuffleServiceImpl also does not need to 
> depend on all TaskManagerServices, only create relevant inside, e.g. network.
> class ShuffleServiceRegistry {
>   <T extends ShuffleService> T getShuffleService(id);
>  registerShuffleService(ShuffleService, id);
>   deregisterShuffleService(id); // remove and close ShuffleService
>   close(); // close all
> }
> ShuffleServiceRegistry is just a generic container of all available 
> ShuffleService’s. It could be part of TaskManagerServices instead of 
> NetworkEnvironment which could go into specific ShuffleServiceImpl.
> 
> I might still miss some details, I would appreciate any feedback.
> 
> Best,
> Andrey
> 
> On 28 Nov 2018, at 08:59, zhijiang <wangzhijiang...@aliyun.com.INVALID> wrote:
> Hi all,
> 
> I adjusted the umbrella jira [1] and corresponding google doc [2] to narrow 
> down the scope of introducing pluggable shuffle manager architecture as the 
> first step. 
> Welcome further feedbacks and suggestions, then I would create specific 
> subtasks for it to forward.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10653
> 
> [2] 
> https://docs.google.com/document/d/1ssTu8QE8RnF31zal4JHM1VaVENow-PweUtXSRr68nGg/edit?usp=sharing
> ------------------------------------------------------------------
> 发件人:zhijiang <wangzhijiang...@aliyun.com.INVALID>
> 发送时间:2018年11月1日(星期四) 17:19
> 收件人:dev <dev@flink.apache.org>; Jin Sun <isun...@gmail.com>
> 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski 
> <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>
> 主 题:回复:[DISCUSS] Proposal of external shuffle service
> 
> Thanks for the efficient response till!
> 
> Thanks sunjin for the good feedbacks, we will further confirm with the 
> comments then! :)
> ------------------------------------------------------------------
> 发件人:Jin Sun <isun...@gmail.com>
> 发送时间:2018年11月1日(星期四) 06:42
> 收件人:dev <dev@flink.apache.org>
> 抄 送:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>; Nico Kruber 
> <n...@data-artisans.com>; Piotr Nowojski <pi...@data-artisans.com>; Stephan 
> Ewen <se...@apache.org>
> 主 题:Re: [DISCUSS] Proposal of external shuffle service
> 
> Thanks Zhijiang for the proposal. I like the idea of external shuffle 
> service, have left some comments on the document. 
> 
> On Oct 31, 2018, at 2:26 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> Thanks for the update Zhijiang! The community is currently quite busy with
> the next Flink release. I hope that we can finish the release in two weeks.
> After that people will become more responsive again.
> 
> Cheers,
> Till
> 
> On Wed, Oct 31, 2018 at 7:49 AM zhijiang <wangzhijiang...@aliyun.com> wrote:
> 
> I already created the umbrella jira [1] for this improvement, and attched
> the design doc [2] in this jira.
> 
> Welcome for further discussion about the details.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-10653
> [2]
> https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing
> 
> 
> <https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing>
> Best,
> Zhijiang
> 
> ------------------------------------------------------------------
> 发件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com.INVALID>
> 发送时间:2018年9月11日(星期二) 15:21
> 收件人:dev <dev@flink.apache.org>
> 抄 送:dev <dev@flink.apache.org>
> 主 题:回复:[DISCUSS] Proposal of external shuffle service
> 
> Many thanks Till!
> 
> 
> I would create a JIRA for this feature and design a document attched with it.
> I will let you know after ready! :)
> 
> Best,
> Zhijiang
> 
> 
> ------------------------------------------------------------------
> 发件人:Till Rohrmann <trohrm...@apache.org>
> 发送时间:2018年9月7日(星期五) 22:01
> 收件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>
> 抄 送:dev <dev@flink.apache.org>
> 主 题:Re: [DISCUSS] Proposal of external shuffle service
> 
> The rough plan sounds good Zhijiang. I think we should continue with what
> you've proposed: Open a JIRA issue and creating a design document which
> outlines the required changes a little bit more in detail. Once this is
> done, we should link the design document in the JIRA issue and post it here
> for further discussion.
> 
> Cheers,
> Till
> 
> On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
> 
> Glad to receive your positive feedbacks Till!
> 
> Actually our motivation is to support batch job well as you mentioned.
> 
> For output level, flink already has the Subpartition abstraction(writer),
> and currently there are PipelinedSubpartition(memory output) and
> SpillableSubpartition(one-sp-one-file output) implementations. We can
> extend this abstraction to realize other persistent outputs (e.g.
> sort-merge-file).
> 
> For transport level(shuffle service), the current SubpartitionView
> abstraction(reader) seems as the brige linked with the output level, then
> 
> the view can understand and read the different output formats. The current
> NetworkEnvironment seems take the role of internal shuffle service in
> TaskManager and the transport server is realized by netty inside. This
> 
> component can also be started in other external containers like NodeManager
> of yarn to take the role of external shuffle service. Further we can
> 
> abstract to extend the shuffle service for transporting outputs by http or
> 
> rdma instead of current netty.  This abstraction should provide the way for
> output registration in order to read the results correctly, similar with
> current SubpartitionView.
> 
> The above is still a rough idea. Next I plan to create a feature jira to
> cover the related changes if possible. It would be better if getting help
> from related committers to review the detail designs together.
> 
> Best,
> Zhijiang
> 
> ------------------------------------------------------------------
> 发件人:Till Rohrmann <trohrm...@apache.org>
> 发送时间:2018年8月29日(星期三) 17:36
> 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com>
> 主 题:Re: [DISCUSS] Proposal of external shuffle service
> 
> Thanks for starting this design discussion Zhijiang!
> 
> I really like the idea to introduce a ShuffleService abstraction which
> 
> allows to have different implementations depending on the actual use case.
> 
> Especially for batch jobs I can clearly see the benefits of persisting the
> results somewhere else.
> 
> Do you already know which interfaces we need to extend and where to
> introduce new abstractions?
> 
> Cheers,
> Till
> 
> On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999)
> <wangzhijiang...@aliyun.com.invalid> wrote:
> Hi all!
> 
> 
> The shuffle service is responsible for transporting upstream produced data
> to the downstream side. In flink, the NettyServer is used for network
> 
> transport service and this component is started in the TaskManager process.
> That means the TaskManager can support internal shuffle service which
> exists some concerns:
> 1. If a task finishes, the ResultPartition of this task still retains
> registered in TaskManager, because the output buffers have to be
> transported by internal shuffle service in TaskManager. That means the
> TaskManager can not be released by ResourceManager until ResultPartition
> released. It may waste container resources and can not support well for
> dynamic resource scenarios.
> 2. If we want to expand another shuffle service implementation, the
> current mechanism is not easy to handle, because the output level (result
> partition) and transport level (shuffle service) are not divided clearly
> and loss of abstraction to be extended.
> 
> For above considerations, we propose the external shuffle service which
> can be deployed on any other external contaienrs, e.g. NodeManager
> 
> container in yarn. Then the TaskManager can be released ASAP ifneeded when
> all the internal tasks finished. The persistent output files of these
> finished tasks can be served to transport by external shuffle service in
> the same machine.
> 
> Further we can abstract both of the output level and transport level to
> 
> support different implementations. e.g. We realized merging the data of all
> 
> the subpartitions into limited persistent local files for disk improvements
> in some scenarios instead of one-subpartiton-one-file.
> 
> I know it may be a big work for doing this, and I just point out some
> ideas, and wish getting any feedbacks from you!
> 
> Best,
> Zhijiang
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 


Reply via email to