Re: Flink Filters have state?
Thanks Till! This was helpful! Tim On Fri, Nov 8, 2019, 7:16 AM Till Rohrmann wrote: > Hi Tim, > > 1. The filter is stored within the JobGraph which is persisted to a > persistent storage if HA is enabled. Usually, this is either HDFS, S3 or > any other highly available file system. It is just a serialized POJO. If > you want to store your filter's state you would need to use Flink's state > API [1]. > 2. Unless you use Flink's state API, Flink won't be able to recover the > numElementsSeen field. > 3. I think stateful filters are ok to use if your filter needs to be > stateful. Statefulness usually complicates things so if your function can > be stateless, then I would recommend to make it stateless. However, there > are some applications which strictly require statefulness. > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html > > Cheers, > Till > > On Thu, Nov 7, 2019 at 2:11 PM Timothy Victor wrote: > >> I have a FilterFunction implementation which accepts an argument in its >> constructor which it stores as an instance member.For example: >> >> class ThresholdFilter implements FilterFunction { >> >> private final MyThreshold threshold; >> >> private int numElementsSeen; >> >> public ThresholdFilter(MyThreshold threshold) { >> this.threshold = threshold; >> } >> >> >> >> } >> >> The filter uses the threshold in deciding whether or not to filter the >> incoming element. >> >> All this works but I have some gaps in my understanding. >> >> 1. How is this filter stored and recovered in the case of a failure. >> Is it just serialized to a POJO and stored in the configured state backend? >> >> 2. When recovered will it maintain the state of all members (e.g. note >> that I have a numElementsSeen member in the filter which will keep >> incrementi for each element recevied). >> >> 3. Is this sort of thing even advisable for a filter? I'm guessing >> Filters are meant to be reusable across operator instances. In which case >> the state could be wrong after recovery? >> >> Thanks in advance >> >> Tim >> >
Re: Flink Filters have state?
Hi Tim, 1. The filter is stored within the JobGraph which is persisted to a persistent storage if HA is enabled. Usually, this is either HDFS, S3 or any other highly available file system. It is just a serialized POJO. If you want to store your filter's state you would need to use Flink's state API [1]. 2. Unless you use Flink's state API, Flink won't be able to recover the numElementsSeen field. 3. I think stateful filters are ok to use if your filter needs to be stateful. Statefulness usually complicates things so if your function can be stateless, then I would recommend to make it stateless. However, there are some applications which strictly require statefulness. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html Cheers, Till On Thu, Nov 7, 2019 at 2:11 PM Timothy Victor wrote: > I have a FilterFunction implementation which accepts an argument in its > constructor which it stores as an instance member.For example: > > class ThresholdFilter implements FilterFunction { > > private final MyThreshold threshold; > > private int numElementsSeen; > > public ThresholdFilter(MyThreshold threshold) { > this.threshold = threshold; > } > > > > } > > The filter uses the threshold in deciding whether or not to filter the > incoming element. > > All this works but I have some gaps in my understanding. > > 1. How is this filter stored and recovered in the case of a failure. > Is it just serialized to a POJO and stored in the configured state backend? > > 2. When recovered will it maintain the state of all members (e.g. note > that I have a numElementsSeen member in the filter which will keep > incrementi for each element recevied). > > 3. Is this sort of thing even advisable for a filter? I'm guessing > Filters are meant to be reusable across operator instances. In which case > the state could be wrong after recovery? > > Thanks in advance > > Tim >
Flink Filters have state?
I have a FilterFunction implementation which accepts an argument in its constructor which it stores as an instance member.For example: class ThresholdFilter implements FilterFunction { private final MyThreshold threshold; private int numElementsSeen; public ThresholdFilter(MyThreshold threshold) { this.threshold = threshold; } } The filter uses the threshold in deciding whether or not to filter the incoming element. All this works but I have some gaps in my understanding. 1. How is this filter stored and recovered in the case of a failure. Is it just serialized to a POJO and stored in the configured state backend? 2. When recovered will it maintain the state of all members (e.g. note that I have a numElementsSeen member in the filter which will keep incrementi for each element recevied). 3. Is this sort of thing even advisable for a filter? I'm guessing Filters are meant to be reusable across operator instances. In which case the state could be wrong after recovery? Thanks in advance Tim