Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture
Hi Biao, you're right. What you've described is a totally valid use case and we should design the interfaces such that you can have specialized implementations for the cases where you can exploit things like a common DFS. I think Nico's design should include this. Cheers, Till On Fri, Jun 16, 2017 at 4:10 PM, Biao Liuwrote: > Hi Till > > I agree with you about the Flink's DC. It is another topic indeed. I just > thought that we can think more about it before refactoring BLOB service. > Make sure that it's easy to implement DC on the refactored architecture. > > I have another question about BLOB service. Can we abstract the BLOB > service to some high-level interfaces? May be just some put/get methods in > the interfaces. Easy to extend will be useful in some scenarios. > > For example in Yarn mode, there are some cool features interesting us. > 1. Yarn can localize files only once in one slave machine, all TMs in the > same job can share these files. That may save lots of bandwidth for large > scale jobs or jobs which have large BLOBs. > 2. We can skip uploading files if they are already on DFS. That's a common > scenario in distributed cache. > 3. Even more, actually we don't need a BlobServer component in Yarn mode. > We can rely on DFS to distribute files. There is always a DFS available in > Yarn cluster. > > If we do so, the BLOB service through network can be the default > implementation. It could work in any situation. It's also clear that it > does not dependent on Hadoop explicitly. And we can do some optimization in > different kinds of clusters without any hacking. > > That are just some rough ideas above. But I think well abstracted > interfaces will be very helpful. >
Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture
Hi Till I agree with you about the Flink's DC. It is another topic indeed. I just thought that we can think more about it before refactoring BLOB service. Make sure that it's easy to implement DC on the refactored architecture. I have another question about BLOB service. Can we abstract the BLOB service to some high-level interfaces? May be just some put/get methods in the interfaces. Easy to extend will be useful in some scenarios. For example in Yarn mode, there are some cool features interesting us. 1. Yarn can localize files only once in one slave machine, all TMs in the same job can share these files. That may save lots of bandwidth for large scale jobs or jobs which have large BLOBs. 2. We can skip uploading files if they are already on DFS. That's a common scenario in distributed cache. 3. Even more, actually we don't need a BlobServer component in Yarn mode. We can rely on DFS to distribute files. There is always a DFS available in Yarn cluster. If we do so, the BLOB service through network can be the default implementation. It could work in any situation. It's also clear that it does not dependent on Hadoop explicitly. And we can do some optimization in different kinds of clusters without any hacking. That are just some rough ideas above. But I think well abstracted interfaces will be very helpful.
Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture
Hi Biao, you're right that the BlobServer won't live in the JM in FLIP-6. Instead it will either be part of the RM or the dispatcher component depending on the actual implementation. The requirements for the BlobServer should, however, be the same. Concerning the question about Flink's distributed cache, I think this is an orthogonal topic. I think that we should be able to piggy-back on the BlobServer once we have refactored it. This should simplify the DC's implementation a bit. Cheers, Till On Thu, Jun 15, 2017 at 9:16 AM, Biao Liuwrote: > I have the same concern with Chesnay Schepler. AFIK Flink does not support > DC as well as Mapreduce and Spark. We only support DC in DataSet API. And > DC in flink do not support local files. Is this a good change to refactor > DC too? > > I have another concern, currently BLOB server has some conflicts with > FLIP-6 architecture. We start JM while submitting job instead of starting > it before in FLIP-6. If BLOB server is embedded in JM we can not upload > jars and files before JM started. But the fact is that we need jars > uploaded before starting JM. Correct me is I was wrong. > To solve this problem we can separate submitting job into different stage. > Or we can separate BLOB server as a independent component parallel with RM. > > Maybe we can think more about these in FLIP-19, what do you think? @Nico >
Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture
I have the same concern with Chesnay Schepler. AFIK Flink does not support DC as well as Mapreduce and Spark. We only support DC in DataSet API. And DC in flink do not support local files. Is this a good change to refactor DC too? I have another concern, currently BLOB server has some conflicts with FLIP-6 architecture. We start JM while submitting job instead of starting it before in FLIP-6. If BLOB server is embedded in JM we can not upload jars and files before JM started. But the fact is that we need jars uploaded before starting JM. Correct me is I was wrong. To solve this problem we can separate submitting job into different stage. Or we can separate BLOB server as a independent component parallel with RM. Maybe we can think more about these in FLIP-19, what do you think? @Nico
Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture
The DC does delete local files. The user requests the file through the RuntimeContext, but the download and local caching is completely handled by Flink. The problems you mention is exactly why I'm suggesting to rebuild the DC on-top of the Blob service. If a user currently wants to use the for a local file he has to do the following: 1. upload the file to dfs 2. register file with flink 3. delete distributed file But in step 2, couldn't we load the file into the Blob storage and remove it once the job finishes? On 14.06.2017 15:23, Nico Kruber wrote: Actually, I don't see the automatic cleanup you referred to but from what I can see around the DistributedCache class and its use, it is simply a registry for user-files whose life cycle is completely managed by the user (upload, download, delete). Files may even reside outside of flink's control and survive job termination. Therefore, it is a different use case which can be (mis-)used as some sort of blob storage. It... (1) will not work if there is no distributed/commonly-accessible file system, (2) does not provide a life cycle management for distributed files, (3) does not delete local files (this gets especially complicated if files are shared, e.g. multiple task at one TaskManager running the same job with the same libraries), ... Nico On Wednesday, 14 June 2017 12:29:55 CEST Chesnay Schepler wrote: One thing i was wondering for a long time now is why the distributed cache is not implemented via the blob store. The DC is essentially just a copy routine with local caching and automatic cleanup, so basically what the blob store is supposed to do (i guess). On 13.06.2017 16:31, Nico Kruber wrote: Hi all, I'd like to initiate a discussion on some architectural changes for the BLOB server which finally add a proper cleanup story, remove some dead code, and extend the BLOB store's use for off-loaded (large) RPC messages. Please have a look at FLIP-19 that details the proposed changes: https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB +storage+architecture Regards Nico PS: While doing the re-write, I'd also like to fix some concurrency issues in the current code.
Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture
Actually, I don't see the automatic cleanup you referred to but from what I can see around the DistributedCache class and its use, it is simply a registry for user-files whose life cycle is completely managed by the user (upload, download, delete). Files may even reside outside of flink's control and survive job termination. Therefore, it is a different use case which can be (mis-)used as some sort of blob storage. It... (1) will not work if there is no distributed/commonly-accessible file system, (2) does not provide a life cycle management for distributed files, (3) does not delete local files (this gets especially complicated if files are shared, e.g. multiple task at one TaskManager running the same job with the same libraries), ... Nico On Wednesday, 14 June 2017 12:29:55 CEST Chesnay Schepler wrote: > One thing i was wondering for a long time now is why the distributed > cache is not implemented > via the blob store. > > The DC is essentially just a copy routine with local caching and > automatic cleanup, so basically what > the blob store is supposed to do (i guess). > > On 13.06.2017 16:31, Nico Kruber wrote: > > Hi all, > > I'd like to initiate a discussion on some architectural changes for the > > BLOB server which finally add a proper cleanup story, remove some dead > > code, and extend the BLOB store's use for off-loaded (large) RPC > > messages. > > > > Please have a look at FLIP-19 that details the proposed changes: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB > > +storage+architecture > > > > > > Regards > > Nico > > > > PS: While doing the re-write, I'd also like to fix some concurrency issues > > in the current code. signature.asc Description: This is a digitally signed message part.
Re: [DISCUSS] FLIP-19: Improved BLOB storage architecture
One thing i was wondering for a long time now is why the distributed cache is not implemented via the blob store. The DC is essentially just a copy routine with local caching and automatic cleanup, so basically what the blob store is supposed to do (i guess). On 13.06.2017 16:31, Nico Kruber wrote: Hi all, I'd like to initiate a discussion on some architectural changes for the BLOB server which finally add a proper cleanup story, remove some dead code, and extend the BLOB store's use for off-loaded (large) RPC messages. Please have a look at FLIP-19 that details the proposed changes: https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB +storage+architecture Regards Nico PS: While doing the re-write, I'd also like to fix some concurrency issues in the current code.