Hi Zhijiang, I see there’s a YarnShuffleService in newly released Blink branch. Is there any relationship between that YarnShuffleService and your external shuffle service?
Regards, Qi > On Jan 28, 2019, at 8:07 PM, zhijiang <wangzhijiang...@aliyun.com.INVALID> > wrote: > > Hi till, > > Very glad to receive your feedbacks and it is atually very helpful. > > The proposed ShuffleMaster in JM would be involved in many existing > processes, such as task deployment, task failover, TM release, so it might be > interactive with corresponding Scheduler, FailoverStrategy, SlotPool > components. In the first version we try to focus on deploying process which > is described in detail in the FLIP. Concerning the other improvements based > on the proposed architecuture, we just mentioned the basic ideas and have not > given the whole detail process. But I think it is reasonable and natural to > solve these issues based on that. And we would further give more details for > other future steps. > > I totally agree with your thought of handling TM release. Currently once the > task is finished, the corresponding slot is regared as free no matter whether > the produced partition is consumed or not. Actually we could think both task > and its partitionsoccupy resources in slot. So the slot can be regared as > free until the internal partition is consumed and released. Then the TM > release logic is also improved meanwhile. I think your suggestions below > already gives the detail and specific process for this improvement. > > I am in favor of launching a separate thread for this discussion again, > thanks for the advice! > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:Till Rohrmann <trohrm...@apache.org> > Send Time:2019年1月28日(星期一) 19:14 > To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com> > Cc:Andrey Zagrebin <and...@da-platform.com> > Subject:Re: [DISCUSS] Proposal of external shuffle service > > Thanks for creating the FLIP-31 for the external shuffle service Zhijiang. It > looks good to me. > > One thing which is not fully clear to me yet is how the lifecycle management > of the partitions integrates with the slot management. At the moment, > conceptually we consider the partition data being owned by the TM if I > understood it correctly. This means the ShuffleMaster is asked whether a TM > can be freed. However, the JobMaster only thinks in terms of slots and not > TMs. Thus, the logic would be that the JM asks the ShuffleMaster whether it > can return a certain slot. Atm the freeing of slots is done by the `SlotPool` > and, thus this would couple the `SlotPool` and the `ShuffleMaster`. Maybe we > need to introduce some mechanism to signal when a slot has still some > occupied resources. In the shared slot case, one could think of allocating a > dummy slot in the shared slot which we only release after the partition data > has been consumed. > > In order to give this design document a little bit more visibility, I would > suggest to post it again on the dev mailing list in a separate thread under > the title "[DISCUSS] Flip-31: Pluggable Shuffle Manager" or something like > this. > > Cheers, > Till > On Mon, Jan 21, 2019 at 7:05 AM zhijiang <wangzhijiang...@aliyun.com.invalid> > wrote: > Hi all, > > FYI, I created the FLIP-31 under [1] for this proposal and created some > subtasks under umbrella jira [2]. > Welcome any concerns in previous google doc or speific jiras. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-31%3A+Pluggable+Shuffle+Manager > [2] https://issues.apache.org/jira/browse/FLINK-10653 > > Best, > Zhijiang > ------------------------------------------------------------------ > From:zhijiang <wangzhijiang...@aliyun.com.INVALID> > Send Time:2019年1月15日(星期二) 17:55 > To:Andrey Zagrebin <and...@da-platform.com> > Cc:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] Proposal of external shuffle service > > Hi all, > > After continuous discussion with Andrey offline, we already reach an > agreement for this proposal and co-author the latest google doc under [1]. > > We plan to create FLIP and sub-tasks by the end of this week, and the first > MVP wishes to be covered in FLINK 1.8. > > Welcome any feedbacks and suggestions! :) > > [1] > https://docs.google.com/document/d/1l7yIVNH3HATP4BnjEOZFkO2CaHf1sVn_DSxS2llmkd8/edit?usp=sharing > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:zhijiang <wangzhijiang...@aliyun.com.INVALID> > Send Time:2018年12月25日(星期二) 15:33 > To:Andrey Zagrebin <and...@da-platform.com> > Cc:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] Proposal of external shuffle service > > Hi Andrey, > > Thanks for efficient response for the UnknownShuffleDeploymentDescriptor > issue. > > It is reasonable for considering this special case on both ShuffleMaster and > ShuffleService sides. > On upstream ShuffleService side, the created ResultPartitionWriter decides > whether to notify ShuffleMaster of consumable partition when outputs the > first buffer or finishes. > On ShuffleMaster side, it might define a method in ShuffleMaster interface > for handling this notification message from upstream side, and then > internally decide whether to update partition info for downstream sides or > not. > On downstream ShuffleService side, it might define a method in ShuffleService > interface to handle the update partition info message from ShuffleMaster, > then it can find the corresponding created InputGate to update. > The communication between ShuffleService and ShuffleMaster can make use of > TMGateway & JMGateway for current implementation. Certainly it can also rely > on other ways for different ShuffleManager implementations. I would update > the google doc to make this process clear if you also think so. :) > > Best, > Zhijiang > > > ------------------------------------------------------------------ > From:Andrey Zagrebin <and...@da-platform.com> > Send Time:2018年12月25日(星期二) 02:32 > To:zhijiang <wangzhijiang...@aliyun.com> > Cc:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] Proposal of external shuffle service > > Hi Zhijiang, > > Thanks for considering my thoughts and concerns. Those are just suggestions > for your design document. > > My understanding about 2.1 was initially that shuffle service is also treated > as unknown in case of UnknownShuffleDeploymentDescriptor which is not quite > true. > Thinking about it more, it might be actually up to shuffle service to decide > how to react on the events of producer or consumer deployment. > Maybe, ShuffleMaster could have two register/deregister methods for input and > output (now partition) and/or also task state update method to communicate > status of ShuffleService running in TM. > Internally shuffle implementation could decide how to communicate between > ShuffleMaster and ShuffleService. If shuffle is channel-based it can behave > in a similar way as now. > I agree it probably needs more discussion and refactoring could be planned > step by step if it is too involving change. > > Best, > Andrey > > On Mon, Dec 24, 2018 at 11:31 AM zhijiang <wangzhijiang...@aliyun.com> wrote: > Hi Andrey, > > Thanks for further research on this topic and providing very helpful > summaries. > > As we discussed before, I really like the idea of dividing two separate > components on both JM and TM sides. > > 1. On JM side, the ShuffleMaster componenet created from ShuffleManager can > manage and handle partition related issues properly. > > 1.1 The introduction of PartitionShuffleDescriptor and > PartitiondDeploymentDescriptor is suitable for covering all the necessary > infos related with partition during deployment process and other future > extensions. The form of this new descriptor is also consistent with existing > ResultPartitionDeploymentDescriptor and InputGateDeploymentDescriptor. > > 2. On TM side, the ShuffleService component created from ShuffleManager is a > TM level service, which can be used for creating ResultPartitionWriter and > InputGate during task deployment. > > > 2.1 Concerning of updating UnknownShuffleDeploymentDescriptor, I think it > may bring an argument that whether the ShuffleService should provide a > separate method for updating it or not. In other words, because the InputGate > is created by ShuffleService, then whether all the possible operations for > InputGate such as update or release should be handled via ShuffleService? I > think it can be interpreted to operate InputGate directly if the update or > release is general for all the ShuffleService implementations. But the > InputGate interface should provide the explicit methods for releasing itself > and updating input channels to make the whole process work. > > 2.2 In addition, some implementation details can be further confirmed in > separate JIRAs, such as whether we need task info related parameters during > creating writer, and how to extract necessary components from current > NetworkEnvrironment to wrap in specific ShuffleService implementation, etc. > > 3. For the points mentioned in future extensions, I agree with your analysis. > We can focus on them separately step by step in different priorities. The > above ShuffleMaster provides a basic precondition for decoupling the life > cycles between partition state and task state. Then we can further extend the > methods in ShuffleMaster to know whether the partition is still available for > speeding up failover, and whether the partition is consumed by downstream to > decide when to release TM or clean partition, etc. It is also a good idea to > further refactor the interfaces on writer and reader sides to fine-grained > handle raw record instead of Buffer. And it would be involved in more changes > in current RecordWriter/StreamInputProcessor. > > I think we can further confirm the above 2.1 issue, then I would adjust the > google doc based on our conclusions which cover not only the first step, but > also all the future extensions described and listed in priority. > BTW, do you think it is necessary that we further co-author a FLIP for this > feature? It is actually involved in many changes on both TM, JM sides. :) > > Best, > Zhijiang > > > > ------------------------------------------------------------------ > From:Andrey Zagrebin <and...@data-artisans.com> > Send Time:2018年12月20日(星期四) 01:20 > To:zhijiang <wangzhijiang...@aliyun.com> > Cc:dev <dev@flink.apache.org> > Subject:Re: [DISCUSS] Proposal of external shuffle service > > Hi Zhijiang, > > Thanks for detailed answers! I am glad we are on the same page. > > I spent some time on thinking more about our concerns and decided to make > more suggestions for the discussion. > At the end, I also gathered some points related to possible extensions of > shuffle API to verify that the final pluggable design can support them later > with less changes. > > > It might make sense for shuffle implementation to have component running on > both JM and TM sides. > JM has a global view of what is happening and can interact with shuffling > system independently on whether tasks are running or not. The component > services could internally further communicate to each other outside of > existing JM/TM APIs, depending on shuffle implementation. > It could help later with partition global life cycle management and cleanup. > Moreover, if we decide to use some ShuffleDeploymentDescriptor instead of > ResultPartitionLocation or factories to instantiate Readers and Writers, they > can be created in Task Executor. > JM is probably not interested in this concern. ShuffleDeploymentDescriptor > can be specific to shuffle implementation, like factories, and contain > specific shuffle config for task side. > 1. Configuration: > interface ShuffleManager { > ShuffleMaster createMaster(Configuration flinkConfig); > ShuffleService createService(Configuration flinkConfig); > } > ShuffleManager is a factory for ShuffleMaster (JM side) and ShuffleService > (TM side). > Flink config could also contain specific shuffle configuration, like port etc. > Class which implements ShuffleManager in Flink cluster config, default is > what we have now (can be the first step) > 2. Job master side > class PartitionShuffleDescriptor { > JobID, ExecutionAttemptID, ResultPartitionType, ResultPartitionLocation, > TaskManagerLocation, etc > later possibly ShuffleType/Descriptor to choose from available shuffle > implementations > } > PartitionShuffleDescriptor contains all abstract information which JM can > provide from the job/execution graph. > ResultPartitionType and ResultPartitionLocation are derived from graph and > execution mode, > so I think they are rather general parameters for any shuffle service and do > not belong to particular shuffle implementation. > interface ShuffleMaster extends AutoClosable { > ShuffleDeploymentDescriptor registerPartition(PartitionShuffleDescriptor); > void deregisterPartition(PartionShuffleDescriptor); > } > JM process creates ShuffleMaster from configured per cluster ShuffleManager. > JM is responsible for its life cycle. > ShuffleMaster is a global manager for partitions. > JM creates PartitionShuffleDescriptor and uses ShuffleMaster to register > partition, e.g. when producer is deployed. > ShuffleMaster transforms abstract PartitionShuffleDescriptor into a specific > ShuffleDeploymentDescriptor. > ShuffleDeploymentDescriptor is put into ResultPartitionDeploymentDescriptor > and InputGateDeploymentDescriptor. > It can contain specific partition config for ShuffleService on TM side to > serve record readers and writers. > If it is channel-based then further break down to channel configs. > Special UnknownShuffleDeploymentDescriptor could be used for eager deployment > when task input is unknown yet. > Later, we could add an option to release partition globally by deregistering > it with the ShuffleMaster. e.g. to clean it up. > 3. Task executor side > interface ShuffleService extends AutoClosable { > ResultPartitionWriter > createResultPartitionWriter(ResultPartitionDeploymentDescriptor); > InputGate createInputGate(InputGateDeploymentDescriptor); > } > TM process creates ShuffleService from configured per cluster ShuffleManager. > TM is responsible for its life cycle. > ShuffleService could substitute NetworkEnvironment in TaskManagerServices. > 4. Later extensions > 4.1 Per job/job edge config > To keep jobs cluster independent, we could introduce abstract predefined > ShuffleType’s or descriptors > for job developer to set it per job or job edge. The types are > cluster-independent. > Cluster config could contain provided ShuffleManager implementation class for > each supported ShuffleType or fallback to default for some types. > Instead of one ShuffleMaster/ShuffleService, JM/TM could have keep a registry > of ShuffleMaster/ShuffleService’s per ShuffleType. > 4.2 Delay TM shutdown until all local partitions have been consumed > JM could keep separately state of partition life cycle (e.g. in job state, > HA). The task executor is to shutdown (e.g. after timeout in yarn) if all its > tasks are done and all local partitions are consumed. If there are no local > partitions then it can shutdown immediately. Whether JM should check that all > produced by TM partitions are consumed is a feature of ShuffleManager. This > could be done by calling some ShuffleManager.getFeatures() interface method. > 4.3 Speed up failover > If partition is computed JM could reuse it as mention in fine-grained shuffle > system design. Whether the partition is still available after task or task > executor crash is also a feature of ShuffleManager.getFeatures(). > 4.4 Partition garbage collection > When the consumer task is done, the partition is to deregister and cleanup > with the ShuffleMaster. > In case of external storage, partitions are at risk to linger after > job/cluster failures. The partition TTL is one option as mentioned in > fine-grained shuffle system design. The TTL timer could be started when there > is no partition access activity for certain period of time but there is > always risk to lose partition too early. User could try to recover failed job > any time later. So it might need more sophisticated approach, like manual > cleanup triggering (ShuffleMaster.cleanup(PartitionsInUse)) which drops all > currently unused partitions. > 4.5 Shuffle Reader/Writers operation per record/byte[]/buffer > As discussed, ResultPartitionWriter/InputGate operates on buffers with > serialised records data. Certain shuffle services might benefit from > operating per serialised records or even java objects (e.g. local channel > could hand over them or their copies from TypeSerializer.copy()). Record key > could be treated as its meta info, additionally to bytes or to user java > object. > ShuffleService could be refactored later to return RecordReader/RecordWriter. > They could extend AbstractSerialisingRecordReader/Writer or > AbstractBufferingRecordReader/Writer to import current behaviour and share > code. This requires refactoring of StreamInputProcessor and RecordWriter to > extract the interfaces. > It might be useful for ResultPartitionWriter/InputGate or > RecordReader/RecordWriter also to extend AutoClosable in case the internal > implementation needs a per task life cycle for them. > > I hope it can help with the design. Feel free to give feedback. > > Best, > Andrey > > On 10 Dec 2018, at 08:41, zhijiang <wangzhijiang...@aliyun.com> wrote: > Hi Andrey, > > Thanks for providing so detailed concerns and enlightenments for this > proposal. We exchanged our views of three main issues on google doc last week > and it seems more appropriate to further contact here. :) > > 1. Configuration level for shuffle (cluster/job/operator) > - how do we share shuffle manager resources among different job tasks within > one task executor process? It could be some static objects shared by all > shuffle manager objects of some type but it might be not scalable approach. > Example could be multiplexed netty connections (as I understand, current > netty stack can become just custom shuffle service). > The creation of ShuffleManager instance on task level is just like the > process of creating StateBackend in StateBackendLoader. The ShuffleService > and ShuffleManager are two independent components, and the interaction > between them is only registration mechanism. In detail, if some > ShuffleManager instances want to rely ShuffleService to transport data, then > it can register related infos to ShuffleService during creation of > ResultPartitionWriter. So the ShuffleManager instance do not need contain > any objects like netty related stacks. The flink runtime can provide one > unified netty-based ShuffleService which can be started in both internal > TaskManager or external containers. The internal ShuffleService not only > takes the role of tranporting data directly for some ShuffleManager instances > but also takes the role of RPC server for communicating with external > ShuffleService, such as register result partition to external service, > otherwise the external service might need an additional RPC service to > contact with TaskManager. Here we have the implicit meaning to make intenral > shuffle as a basic service started in TaskManager like the components of > IOManager and MemoryManager, even thought useless for some type jobs. > - In case of having it per job, we might need to provide compatibility check > between shuffle service and cluster mode (e.g. yarn ext shuffle service for > standalone mode cluster) if it is an issue. > - Having it per job feels like the same complexity as having it per operator, > at the first glance, just changes its granularity and where objects reside. > - what is the problem to use cluster per job mode? Then shuffle manager per > cluster and per job is the same but might simplify other issues at the > beginning. Streaming and batch jobs with different shuffle requirements could > be started in different clusters per job. > > I totally agree with the above concerns for per job configuration. As you > mentioned, it is a option to run different type jobs in different clusters. > But in some special scenarios like hybrid cluster to run online and offline > jobs in differemt times, it is betterto support job level configuration for > fexibility. Certainly it may not be a strong requirements for most cases, > then we can reach an agreement to make the cluster level as the easiest way > first and adjut the level if needed in future. > > 2. ShuffleManager interface > > I think you mentioned three sub issues in this part: > > 2.1 Introduction of additional ResultPartitionWriterFactory && > InputGateReaderFactory > > I am not against the introduction of these two factories. The original > introduction of pluggable ShuffleManager interface is for creating different > writer and reader sides. If the ShuffleManager interface is used for creating > factories, and then the factories are used for creating writer and reader. I > still think the essence is same, and only the form is different. That is the > ShuffleManager concept is seen on JobManager side, and the task only sees the > corresponding factories from ShuffleManager. In other words, we add another > factory layer to distinguish between JobManager and task. The form might seem > a bit better to introduce corresponding factories, so I am willing to take > this way for implementation. > > 2.2 Whether to retain getResultPartitionLocation method in ShuffleManager > interface > > If I understand correctly, you mean to put this location as an argument in > InputGateReaderFacotry constructor? If to do so, I think it makes sense and > we can avoid have this explicit method in interface. But we also need to > adjust the existing related process like updatePartitionInfo for downstream > side. In this case, the partition location is unknown during deploying > downstream tasks. Based on upstream's consumable notification, the location > update is triggered by JobManager to downstream side. > > 2.3 ShuffleService interface > > My initial thought is not making it as an interface. Because for internal or > external shuffle cases, they can reuse the same unified netty-based shuffle > service if we wrap the related componenets into current shuffle service well. > If we want to furtherextend other implementations of shuffle service, like > http-based shuffle service, then we can define an interface for it, the way > as current RpcService interface to get ride of only akka implementations. So > it also makes sense on my side to keep this interface. As for > ShuffleServiceRegistry class, I agree with you to have this TaskManager level > service for managing and sharing for all the internal tasks. > > In summary, I think we do not have essential conflicts for above issues, > almost for the implementation aspects. And I agree with the above points, > especially for above 2.2 you might need double check if I understand > correctly. > Wish your further feedbacks then I can adjust the docs based on it. Also > welcome any other person's feedbacks! > > > Best, > Zhijiang > > > ------------------------------------------------------------------ > 发件人:Andrey Zagrebin <and...@data-artisans.com> > 发送时间:2018年12月10日(星期一) 05:18 > 收件人:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com> > 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski > <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org>; Till Rohrmann > <trohrm...@apache.org> > 主 题:Re: [DISCUSS] Proposal of external shuffle service > > Hi Zhijiang, > > > Thanks for sharing the document Zhijiang. > I decided to compile my thoughts to consider here, not to overload document > comments any more :) > I think I still have question about job level configuration for the shuffle > service. You mentioned that we can keep several shuffle manager objects in > one task executor for different jobs. This is valid. My concerns are: > - how do we share shuffle manager resources among different job tasks within > one task executor process? It could be some static objects shared by all > shuffle manager objects of some type but it might be not scalable approach. > Example could be multiplexed netty connections (as I understand, current > netty stack can become just custom shuffle service). > - In case of having it per job, we might need to provide compatibility check > between shuffle service and cluster mode (e.g. yarn ext shuffle service for > standalone mode cluster) if it is an issue. > - Having it per job feels like the same complexity as having it per operator, > at the first glance, just changes its granularity and where objects reside. > - what is the problem to use cluster per job mode? Then shuffle manager per > cluster and per job is the same but might simplify other issues at the > beginning. Streaming and batch jobs with different shuffle requirements could > be started in different clusters per job. > As for ShuffleManager interface, I think I see your point with the > ResultPartitionLocation. I agree that partition needs some addressing of > underlying connection or resources in general. It can be thinked of as an > argument of ShuffleManager factory methods. > My point is that task code might not need to be coupled to shuffle interface. > This way we could keep task code more independent of records transfer layer. > We can always change later how shuffle/network service is organised > internally without any consequences for the general task code. If task code > calls just factories provided by JM, it might not even matter for the task in > future whether it is configured per cluster, job or operator. Internally, > factory can hold location of concrete type if needed. > Code example could be: > Job Manager side: > interface ShuffleManager { > ResultPartionWriterFactory > createResultPartionWriterFactory(job/task/topology descriptors); > // similar for input gate factory > } > class ShuffleManagerImpl implements ShuffleManager { > private general config, services etc; > ResultPartionWriterFactory > createResultPartionWriterFactory(job/task/topology descriptors) { > return new ResultPartionWriterFactoryImpl(location, job, oper id, other > specific config etc); > } > // similar for input gate factory > } > ... > // somewhere in higher level code put ResultPartionWriterFactory into > descriptor > Task executor side receives the factory inside the descriptor and calls > factory.create(ShuffleServiceRegistry). Example of factory: > class ResultPartionWriterFactoryImpl implements ResultPartionWriterFactory { > // all fields are lightweight and serialisable, received from JM > private location, shuffle service id, other specific config etc; > > ResultPartionWriter create(ShuffleServiceRegistry registry, maybe more > generic args) { > // get or create task local specific ShuffleServiceImpl by id in registry > // ShuffleServiceImpl object can be shared between jobs > // register with the ShuffleServiceImpl by location, id, config etc > } > } > interface ShuffleService extends AutoClosable { > getId(); > } > ShuffleServiceImpl manages resources and decides internally whether to do it > per task executor, task, job or operator. It can contain network stack, e,g, > netty connections etc. In case of external service, it can hold partition > manager, transport client etc. It is not enforced to have it per job by this > contract or even to have it at all. ShuffleServiceImpl also does not need to > depend on all TaskManagerServices, only create relevant inside, e.g. network. > class ShuffleServiceRegistry { > <T extends ShuffleService> T getShuffleService(id); > registerShuffleService(ShuffleService, id); > deregisterShuffleService(id); // remove and close ShuffleService > close(); // close all > } > ShuffleServiceRegistry is just a generic container of all available > ShuffleService’s. It could be part of TaskManagerServices instead of > NetworkEnvironment which could go into specific ShuffleServiceImpl. > > I might still miss some details, I would appreciate any feedback. > > Best, > Andrey > > On 28 Nov 2018, at 08:59, zhijiang <wangzhijiang...@aliyun.com.INVALID> wrote: > Hi all, > > I adjusted the umbrella jira [1] and corresponding google doc [2] to narrow > down the scope of introducing pluggable shuffle manager architecture as the > first step. > Welcome further feedbacks and suggestions, then I would create specific > subtasks for it to forward. > > [1] https://issues.apache.org/jira/browse/FLINK-10653 > > [2] > https://docs.google.com/document/d/1ssTu8QE8RnF31zal4JHM1VaVENow-PweUtXSRr68nGg/edit?usp=sharing > ------------------------------------------------------------------ > 发件人:zhijiang <wangzhijiang...@aliyun.com.INVALID> > 发送时间:2018年11月1日(星期四) 17:19 > 收件人:dev <dev@flink.apache.org>; Jin Sun <isun...@gmail.com> > 抄 送:Nico Kruber <n...@data-artisans.com>; Piotr Nowojski > <pi...@data-artisans.com>; Stephan Ewen <se...@apache.org> > 主 题:回复:[DISCUSS] Proposal of external shuffle service > > Thanks for the efficient response till! > > Thanks sunjin for the good feedbacks, we will further confirm with the > comments then! :) > ------------------------------------------------------------------ > 发件人:Jin Sun <isun...@gmail.com> > 发送时间:2018年11月1日(星期四) 06:42 > 收件人:dev <dev@flink.apache.org> > 抄 送:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com>; Nico Kruber > <n...@data-artisans.com>; Piotr Nowojski <pi...@data-artisans.com>; Stephan > Ewen <se...@apache.org> > 主 题:Re: [DISCUSS] Proposal of external shuffle service > > Thanks Zhijiang for the proposal. I like the idea of external shuffle > service, have left some comments on the document. > > On Oct 31, 2018, at 2:26 AM, Till Rohrmann <trohrm...@apache.org> wrote: > > Thanks for the update Zhijiang! The community is currently quite busy with > the next Flink release. I hope that we can finish the release in two weeks. > After that people will become more responsive again. > > Cheers, > Till > > On Wed, Oct 31, 2018 at 7:49 AM zhijiang <wangzhijiang...@aliyun.com> wrote: > > I already created the umbrella jira [1] for this improvement, and attched > the design doc [2] in this jira. > > Welcome for further discussion about the details. > > [1] https://issues.apache.org/jira/browse/FLINK-10653 > [2] > https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing > > > <https://docs.google.com/document/d/1Jb0Mf46ace-6cLRQxJzo6VNQQVxn3hwf9Zqmv5pcb34/edit?usp=sharing> > Best, > Zhijiang > > ------------------------------------------------------------------ > 发件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com.INVALID> > 发送时间:2018年9月11日(星期二) 15:21 > 收件人:dev <dev@flink.apache.org> > 抄 送:dev <dev@flink.apache.org> > 主 题:回复:[DISCUSS] Proposal of external shuffle service > > Many thanks Till! > > > I would create a JIRA for this feature and design a document attched with it. > I will let you know after ready! :) > > Best, > Zhijiang > > > ------------------------------------------------------------------ > 发件人:Till Rohrmann <trohrm...@apache.org> > 发送时间:2018年9月7日(星期五) 22:01 > 收件人:Zhijiang(wangzhijiang999) <wangzhijiang...@aliyun.com> > 抄 送:dev <dev@flink.apache.org> > 主 题:Re: [DISCUSS] Proposal of external shuffle service > > The rough plan sounds good Zhijiang. I think we should continue with what > you've proposed: Open a JIRA issue and creating a design document which > outlines the required changes a little bit more in detail. Once this is > done, we should link the design document in the JIRA issue and post it here > for further discussion. > > Cheers, > Till > > On Wed, Aug 29, 2018 at 6:04 PM Zhijiang(wangzhijiang999) < > wangzhijiang...@aliyun.com> wrote: > > Glad to receive your positive feedbacks Till! > > Actually our motivation is to support batch job well as you mentioned. > > For output level, flink already has the Subpartition abstraction(writer), > and currently there are PipelinedSubpartition(memory output) and > SpillableSubpartition(one-sp-one-file output) implementations. We can > extend this abstraction to realize other persistent outputs (e.g. > sort-merge-file). > > For transport level(shuffle service), the current SubpartitionView > abstraction(reader) seems as the brige linked with the output level, then > > the view can understand and read the different output formats. The current > NetworkEnvironment seems take the role of internal shuffle service in > TaskManager and the transport server is realized by netty inside. This > > component can also be started in other external containers like NodeManager > of yarn to take the role of external shuffle service. Further we can > > abstract to extend the shuffle service for transporting outputs by http or > > rdma instead of current netty. This abstraction should provide the way for > output registration in order to read the results correctly, similar with > current SubpartitionView. > > The above is still a rough idea. Next I plan to create a feature jira to > cover the related changes if possible. It would be better if getting help > from related committers to review the detail designs together. > > Best, > Zhijiang > > ------------------------------------------------------------------ > 发件人:Till Rohrmann <trohrm...@apache.org> > 发送时间:2018年8月29日(星期三) 17:36 > 收件人:dev <dev@flink.apache.org>; Zhijiang(wangzhijiang999) < > wangzhijiang...@aliyun.com> > 主 题:Re: [DISCUSS] Proposal of external shuffle service > > Thanks for starting this design discussion Zhijiang! > > I really like the idea to introduce a ShuffleService abstraction which > > allows to have different implementations depending on the actual use case. > > Especially for batch jobs I can clearly see the benefits of persisting the > results somewhere else. > > Do you already know which interfaces we need to extend and where to > introduce new abstractions? > > Cheers, > Till > > On Mon, Aug 27, 2018 at 1:57 PM Zhijiang(wangzhijiang999) > <wangzhijiang...@aliyun.com.invalid> wrote: > Hi all! > > > The shuffle service is responsible for transporting upstream produced data > to the downstream side. In flink, the NettyServer is used for network > > transport service and this component is started in the TaskManager process. > That means the TaskManager can support internal shuffle service which > exists some concerns: > 1. If a task finishes, the ResultPartition of this task still retains > registered in TaskManager, because the output buffers have to be > transported by internal shuffle service in TaskManager. That means the > TaskManager can not be released by ResourceManager until ResultPartition > released. It may waste container resources and can not support well for > dynamic resource scenarios. > 2. If we want to expand another shuffle service implementation, the > current mechanism is not easy to handle, because the output level (result > partition) and transport level (shuffle service) are not divided clearly > and loss of abstraction to be extended. > > For above considerations, we propose the external shuffle service which > can be deployed on any other external contaienrs, e.g. NodeManager > > container in yarn. Then the TaskManager can be released ASAP ifneeded when > all the internal tasks finished. The persistent output files of these > finished tasks can be served to transport by external shuffle service in > the same machine. > > Further we can abstract both of the output level and transport level to > > support different implementations. e.g. We realized merging the data of all > > the subpartitions into limited persistent local files for disk improvements > in some scenarios instead of one-subpartiton-one-file. > > I know it may be a big work for doing this, and I just point out some > ideas, and wish getting any feedbacks from you! > > Best, > Zhijiang > > > > > > > > > > > > > > > >