Re: [DISCUSS] FLIP-201: Persist local state in working directory
Hi Yun, I assume that most people will use this feature with k8s like deployment environments. But in theory it works everywhere where you can establish a stable relationship between volumes and Flink processes. If Flink processes are restarted on different nodes, then of course you need volumes that can be mounted to these nodes to make this feature work. Cheers, Till On Mon, Jan 10, 2022 at 9:50 AM Yun Tang wrote: > I think this feature could indeed help recovery faster on the case of node > failure. > > It seems this feature could only work well with k8s-like deployment > environment? > > > Best, > Yun Tang > > From: David Morávek > Sent: Wednesday, January 5, 2022 19:51 > To: dev > Subject: Re: [DISCUSS] FLIP-201: Persist local state in working directory > > +1 the general direction here seems pretty solid > > D. > > > On Wed, Jan 5, 2022 at 11:57 AM Till Rohrmann > wrote: > > > If there is no other larger feedback, I would start the vote soonish. > > > > Cheers, > > Till > > > > On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann > > wrote: > > > > > Hi David, > > > > > > Thanks for your feedback. > > > > > > With the graceful shutdown I mean a way to stop the TaskManager and to > > > clean up the working directory. At the moment, I think we always kill > the > > > process via SIGTERM or SIGKILL. This won't clean up the working > directory > > > because it could also originate from a process failure. I think what > > Niklas > > > does is to introduce a signal handler to react to SIGTERM to disconnect > > > from the JobMaster. > > > > > > You are right that by default Flink will now set the RocksDB directory > to > > > the working temp directory. Before it defaulted to the spilling > > > directories. I think this is not a problem because users can still > > manually > > > configure multiple RocksDB directories via > > state.backend.rocksdb.localdir. > > > Moreover, I am not sure how well this mechanism works in practice. > Flink > > > will simply iterate through the directories when creating new RocksDB > > state > > > backends w/o a lot of smartness. If one of the directories is full, > then > > > Flink won't use another one but simply fail. > > > > > > I do see the point of a proper serialization format and I agree that we > > > should eventually implement it. My reasoning was that the PR is already > > > quite big and I would prefer getting it in and then tackling this > problem > > > as a follow-up instead of increasing the scope of the changes further > > > because the serialization format is not required for this feature > > (strictly > > > speaking). I hope that this makes sense. > > > > > > I will also respond to your PR comments. > > > > > > Cheers, > > > Till > > > > > > On Thu, Dec 30, 2021 at 4:00 PM David Morávek wrote: > > > > > >> Hi Till, > > >> > > >> thanks for drafting the FLIP, it looks really good. I did a quick pass > > >> over > > >> the PR and it seems to be heading in a right direction. > > >> > > >> It might be required to introduce a graceful shutdown of the > > TaskExecutor > > >> > in order to support proper cleanup of resources. > > >> > > > >> > > >> This is actively being worked on by Niklas in FLINK-25277 [1]. > > >> > > >> In the PR, I've seen that you're also replacing directories for > storing > > >> the > > >> local state with the working directory. Should this be a concern? Is > for > > >> example rocksdb able to leverage multiple mount paths for spreading > the > > >> load? > > >> > > >> I'd also be in favor of introducing a proper (evolving) serialization > > >> format right away instead of the Java serialization, but no hard > > feelings > > >> if we don't. > > >> > > >> [1] https://issues.apache.org/jira/browse/FLINK-25277 > > >> > > >> Best, > > >> D. > > >> > > >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann > > >> wrote: > > >> > > >> > I've created draft PR for the desired changes [1]. It might be > easier > > to > > >> > take a look at than the branch. > > >> > > > >> > [1] https://github.com/apache/flink/pull/18237 > > >> > > > >> > Cheers, > > >> > Till > > >> > > > >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann > > > >> > wrote: > > >> > > > >> > > Hi everyone, > > >> > > > > >> > > I would like to start a discussion about using the working > directory > > >> to > > >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting > > the > > >> > > local state will be beneficial if a crashed process is restarted > > with > > >> the > > >> > > same working directory. In this case, Flink does not have to > > download > > >> the > > >> > > state artifacts again and can recover locally. > > >> > > > > >> > > A POC can be found here [2]. > > >> > > > > >> > > Looking forward to your feedback. > > >> > > > > >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw > > >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 > > >> > > > > >> > > Cheers, > > >> > > Till > > >> > > > > >> > > > >> > > > > > >
Re: [DISCUSS] FLIP-201: Persist local state in working directory
I think this feature could indeed help recovery faster on the case of node failure. It seems this feature could only work well with k8s-like deployment environment? Best, Yun Tang From: David Morávek Sent: Wednesday, January 5, 2022 19:51 To: dev Subject: Re: [DISCUSS] FLIP-201: Persist local state in working directory +1 the general direction here seems pretty solid D. On Wed, Jan 5, 2022 at 11:57 AM Till Rohrmann wrote: > If there is no other larger feedback, I would start the vote soonish. > > Cheers, > Till > > On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann > wrote: > > > Hi David, > > > > Thanks for your feedback. > > > > With the graceful shutdown I mean a way to stop the TaskManager and to > > clean up the working directory. At the moment, I think we always kill the > > process via SIGTERM or SIGKILL. This won't clean up the working directory > > because it could also originate from a process failure. I think what > Niklas > > does is to introduce a signal handler to react to SIGTERM to disconnect > > from the JobMaster. > > > > You are right that by default Flink will now set the RocksDB directory to > > the working temp directory. Before it defaulted to the spilling > > directories. I think this is not a problem because users can still > manually > > configure multiple RocksDB directories via > state.backend.rocksdb.localdir. > > Moreover, I am not sure how well this mechanism works in practice. Flink > > will simply iterate through the directories when creating new RocksDB > state > > backends w/o a lot of smartness. If one of the directories is full, then > > Flink won't use another one but simply fail. > > > > I do see the point of a proper serialization format and I agree that we > > should eventually implement it. My reasoning was that the PR is already > > quite big and I would prefer getting it in and then tackling this problem > > as a follow-up instead of increasing the scope of the changes further > > because the serialization format is not required for this feature > (strictly > > speaking). I hope that this makes sense. > > > > I will also respond to your PR comments. > > > > Cheers, > > Till > > > > On Thu, Dec 30, 2021 at 4:00 PM David Morávek wrote: > > > >> Hi Till, > >> > >> thanks for drafting the FLIP, it looks really good. I did a quick pass > >> over > >> the PR and it seems to be heading in a right direction. > >> > >> It might be required to introduce a graceful shutdown of the > TaskExecutor > >> > in order to support proper cleanup of resources. > >> > > >> > >> This is actively being worked on by Niklas in FLINK-25277 [1]. > >> > >> In the PR, I've seen that you're also replacing directories for storing > >> the > >> local state with the working directory. Should this be a concern? Is for > >> example rocksdb able to leverage multiple mount paths for spreading the > >> load? > >> > >> I'd also be in favor of introducing a proper (evolving) serialization > >> format right away instead of the Java serialization, but no hard > feelings > >> if we don't. > >> > >> [1] https://issues.apache.org/jira/browse/FLINK-25277 > >> > >> Best, > >> D. > >> > >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann > >> wrote: > >> > >> > I've created draft PR for the desired changes [1]. It might be easier > to > >> > take a look at than the branch. > >> > > >> > [1] https://github.com/apache/flink/pull/18237 > >> > > >> > Cheers, > >> > Till > >> > > >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann > >> > wrote: > >> > > >> > > Hi everyone, > >> > > > >> > > I would like to start a discussion about using the working directory > >> to > >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting > the > >> > > local state will be beneficial if a crashed process is restarted > with > >> the > >> > > same working directory. In this case, Flink does not have to > download > >> the > >> > > state artifacts again and can recover locally. > >> > > > >> > > A POC can be found here [2]. > >> > > > >> > > Looking forward to your feedback. > >> > > > >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw > >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 > >> > > > >> > > Cheers, > >> > > Till > >> > > > >> > > >> > > >
Re: [DISCUSS] FLIP-201: Persist local state in working directory
+1 the general direction here seems pretty solid D. On Wed, Jan 5, 2022 at 11:57 AM Till Rohrmann wrote: > If there is no other larger feedback, I would start the vote soonish. > > Cheers, > Till > > On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann > wrote: > > > Hi David, > > > > Thanks for your feedback. > > > > With the graceful shutdown I mean a way to stop the TaskManager and to > > clean up the working directory. At the moment, I think we always kill the > > process via SIGTERM or SIGKILL. This won't clean up the working directory > > because it could also originate from a process failure. I think what > Niklas > > does is to introduce a signal handler to react to SIGTERM to disconnect > > from the JobMaster. > > > > You are right that by default Flink will now set the RocksDB directory to > > the working temp directory. Before it defaulted to the spilling > > directories. I think this is not a problem because users can still > manually > > configure multiple RocksDB directories via > state.backend.rocksdb.localdir. > > Moreover, I am not sure how well this mechanism works in practice. Flink > > will simply iterate through the directories when creating new RocksDB > state > > backends w/o a lot of smartness. If one of the directories is full, then > > Flink won't use another one but simply fail. > > > > I do see the point of a proper serialization format and I agree that we > > should eventually implement it. My reasoning was that the PR is already > > quite big and I would prefer getting it in and then tackling this problem > > as a follow-up instead of increasing the scope of the changes further > > because the serialization format is not required for this feature > (strictly > > speaking). I hope that this makes sense. > > > > I will also respond to your PR comments. > > > > Cheers, > > Till > > > > On Thu, Dec 30, 2021 at 4:00 PM David Morávek wrote: > > > >> Hi Till, > >> > >> thanks for drafting the FLIP, it looks really good. I did a quick pass > >> over > >> the PR and it seems to be heading in a right direction. > >> > >> It might be required to introduce a graceful shutdown of the > TaskExecutor > >> > in order to support proper cleanup of resources. > >> > > >> > >> This is actively being worked on by Niklas in FLINK-25277 [1]. > >> > >> In the PR, I've seen that you're also replacing directories for storing > >> the > >> local state with the working directory. Should this be a concern? Is for > >> example rocksdb able to leverage multiple mount paths for spreading the > >> load? > >> > >> I'd also be in favor of introducing a proper (evolving) serialization > >> format right away instead of the Java serialization, but no hard > feelings > >> if we don't. > >> > >> [1] https://issues.apache.org/jira/browse/FLINK-25277 > >> > >> Best, > >> D. > >> > >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann > >> wrote: > >> > >> > I've created draft PR for the desired changes [1]. It might be easier > to > >> > take a look at than the branch. > >> > > >> > [1] https://github.com/apache/flink/pull/18237 > >> > > >> > Cheers, > >> > Till > >> > > >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann > >> > wrote: > >> > > >> > > Hi everyone, > >> > > > >> > > I would like to start a discussion about using the working directory > >> to > >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting > the > >> > > local state will be beneficial if a crashed process is restarted > with > >> the > >> > > same working directory. In this case, Flink does not have to > download > >> the > >> > > state artifacts again and can recover locally. > >> > > > >> > > A POC can be found here [2]. > >> > > > >> > > Looking forward to your feedback. > >> > > > >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw > >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 > >> > > > >> > > Cheers, > >> > > Till > >> > > > >> > > >> > > >
Re: [DISCUSS] FLIP-201: Persist local state in working directory
If there is no other larger feedback, I would start the vote soonish. Cheers, Till On Thu, Dec 30, 2021 at 4:28 PM Till Rohrmann wrote: > Hi David, > > Thanks for your feedback. > > With the graceful shutdown I mean a way to stop the TaskManager and to > clean up the working directory. At the moment, I think we always kill the > process via SIGTERM or SIGKILL. This won't clean up the working directory > because it could also originate from a process failure. I think what Niklas > does is to introduce a signal handler to react to SIGTERM to disconnect > from the JobMaster. > > You are right that by default Flink will now set the RocksDB directory to > the working temp directory. Before it defaulted to the spilling > directories. I think this is not a problem because users can still manually > configure multiple RocksDB directories via state.backend.rocksdb.localdir. > Moreover, I am not sure how well this mechanism works in practice. Flink > will simply iterate through the directories when creating new RocksDB state > backends w/o a lot of smartness. If one of the directories is full, then > Flink won't use another one but simply fail. > > I do see the point of a proper serialization format and I agree that we > should eventually implement it. My reasoning was that the PR is already > quite big and I would prefer getting it in and then tackling this problem > as a follow-up instead of increasing the scope of the changes further > because the serialization format is not required for this feature (strictly > speaking). I hope that this makes sense. > > I will also respond to your PR comments. > > Cheers, > Till > > On Thu, Dec 30, 2021 at 4:00 PM David Morávek wrote: > >> Hi Till, >> >> thanks for drafting the FLIP, it looks really good. I did a quick pass >> over >> the PR and it seems to be heading in a right direction. >> >> It might be required to introduce a graceful shutdown of the TaskExecutor >> > in order to support proper cleanup of resources. >> > >> >> This is actively being worked on by Niklas in FLINK-25277 [1]. >> >> In the PR, I've seen that you're also replacing directories for storing >> the >> local state with the working directory. Should this be a concern? Is for >> example rocksdb able to leverage multiple mount paths for spreading the >> load? >> >> I'd also be in favor of introducing a proper (evolving) serialization >> format right away instead of the Java serialization, but no hard feelings >> if we don't. >> >> [1] https://issues.apache.org/jira/browse/FLINK-25277 >> >> Best, >> D. >> >> On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann >> wrote: >> >> > I've created draft PR for the desired changes [1]. It might be easier to >> > take a look at than the branch. >> > >> > [1] https://github.com/apache/flink/pull/18237 >> > >> > Cheers, >> > Till >> > >> > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann >> > wrote: >> > >> > > Hi everyone, >> > > >> > > I would like to start a discussion about using the working directory >> to >> > > persist local state for faster recovery (FLIP-201) [1]. Persisting the >> > > local state will be beneficial if a crashed process is restarted with >> the >> > > same working directory. In this case, Flink does not have to download >> the >> > > state artifacts again and can recover locally. >> > > >> > > A POC can be found here [2]. >> > > >> > > Looking forward to your feedback. >> > > >> > > [1] https://cwiki.apache.org/confluence/x/wJuqCw >> > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 >> > > >> > > Cheers, >> > > Till >> > > >> > >> >
Re: [DISCUSS] FLIP-201: Persist local state in working directory
Hi David, Thanks for your feedback. With the graceful shutdown I mean a way to stop the TaskManager and to clean up the working directory. At the moment, I think we always kill the process via SIGTERM or SIGKILL. This won't clean up the working directory because it could also originate from a process failure. I think what Niklas does is to introduce a signal handler to react to SIGTERM to disconnect from the JobMaster. You are right that by default Flink will now set the RocksDB directory to the working temp directory. Before it defaulted to the spilling directories. I think this is not a problem because users can still manually configure multiple RocksDB directories via state.backend.rocksdb.localdir. Moreover, I am not sure how well this mechanism works in practice. Flink will simply iterate through the directories when creating new RocksDB state backends w/o a lot of smartness. If one of the directories is full, then Flink won't use another one but simply fail. I do see the point of a proper serialization format and I agree that we should eventually implement it. My reasoning was that the PR is already quite big and I would prefer getting it in and then tackling this problem as a follow-up instead of increasing the scope of the changes further because the serialization format is not required for this feature (strictly speaking). I hope that this makes sense. I will also respond to your PR comments. Cheers, Till On Thu, Dec 30, 2021 at 4:00 PM David Morávek wrote: > Hi Till, > > thanks for drafting the FLIP, it looks really good. I did a quick pass over > the PR and it seems to be heading in a right direction. > > It might be required to introduce a graceful shutdown of the TaskExecutor > > in order to support proper cleanup of resources. > > > > This is actively being worked on by Niklas in FLINK-25277 [1]. > > In the PR, I've seen that you're also replacing directories for storing the > local state with the working directory. Should this be a concern? Is for > example rocksdb able to leverage multiple mount paths for spreading the > load? > > I'd also be in favor of introducing a proper (evolving) serialization > format right away instead of the Java serialization, but no hard feelings > if we don't. > > [1] https://issues.apache.org/jira/browse/FLINK-25277 > > Best, > D. > > On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann > wrote: > > > I've created draft PR for the desired changes [1]. It might be easier to > > take a look at than the branch. > > > > [1] https://github.com/apache/flink/pull/18237 > > > > Cheers, > > Till > > > > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann > > wrote: > > > > > Hi everyone, > > > > > > I would like to start a discussion about using the working directory to > > > persist local state for faster recovery (FLIP-201) [1]. Persisting the > > > local state will be beneficial if a crashed process is restarted with > the > > > same working directory. In this case, Flink does not have to download > the > > > state artifacts again and can recover locally. > > > > > > A POC can be found here [2]. > > > > > > Looking forward to your feedback. > > > > > > [1] https://cwiki.apache.org/confluence/x/wJuqCw > > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 > > > > > > Cheers, > > > Till > > > > > >
Re: [DISCUSS] FLIP-201: Persist local state in working directory
Hi Till, thanks for drafting the FLIP, it looks really good. I did a quick pass over the PR and it seems to be heading in a right direction. It might be required to introduce a graceful shutdown of the TaskExecutor > in order to support proper cleanup of resources. > This is actively being worked on by Niklas in FLINK-25277 [1]. In the PR, I've seen that you're also replacing directories for storing the local state with the working directory. Should this be a concern? Is for example rocksdb able to leverage multiple mount paths for spreading the load? I'd also be in favor of introducing a proper (evolving) serialization format right away instead of the Java serialization, but no hard feelings if we don't. [1] https://issues.apache.org/jira/browse/FLINK-25277 Best, D. On Wed, Dec 29, 2021 at 4:58 PM Till Rohrmann wrote: > I've created draft PR for the desired changes [1]. It might be easier to > take a look at than the branch. > > [1] https://github.com/apache/flink/pull/18237 > > Cheers, > Till > > On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann > wrote: > > > Hi everyone, > > > > I would like to start a discussion about using the working directory to > > persist local state for faster recovery (FLIP-201) [1]. Persisting the > > local state will be beneficial if a crashed process is restarted with the > > same working directory. In this case, Flink does not have to download the > > state artifacts again and can recover locally. > > > > A POC can be found here [2]. > > > > Looking forward to your feedback. > > > > [1] https://cwiki.apache.org/confluence/x/wJuqCw > > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 > > > > Cheers, > > Till > > >
Re: [DISCUSS] FLIP-201: Persist local state in working directory
I've created draft PR for the desired changes [1]. It might be easier to take a look at than the branch. [1] https://github.com/apache/flink/pull/18237 Cheers, Till On Tue, Dec 28, 2021 at 3:22 PM Till Rohrmann wrote: > Hi everyone, > > I would like to start a discussion about using the working directory to > persist local state for faster recovery (FLIP-201) [1]. Persisting the > local state will be beneficial if a crashed process is restarted with the > same working directory. In this case, Flink does not have to download the > state artifacts again and can recover locally. > > A POC can be found here [2]. > > Looking forward to your feedback. > > [1] https://cwiki.apache.org/confluence/x/wJuqCw > [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 > > Cheers, > Till >
[DISCUSS] FLIP-201: Persist local state in working directory
Hi everyone, I would like to start a discussion about using the working directory to persist local state for faster recovery (FLIP-201) [1]. Persisting the local state will be beneficial if a crashed process is restarted with the same working directory. In this case, Flink does not have to download the state artifacts again and can recover locally. A POC can be found here [2]. Looking forward to your feedback. [1] https://cwiki.apache.org/confluence/x/wJuqCw [2] https://github.com/tillrohrmann/flink/tree/FLIP-201 Cheers, Till