Re: Flink Filters have state?

2019-11-08 Thread Timothy Victor
Thanks Till!   This was helpful!

Tim

On Fri, Nov 8, 2019, 7:16 AM Till Rohrmann  wrote:

> Hi Tim,
>
> 1. The filter is stored within the JobGraph which is persisted to a
> persistent storage if HA is enabled. Usually, this is either HDFS, S3 or
> any other highly available file system. It is just a serialized POJO. If
> you want to store your filter's state you would need to use Flink's state
> API [1].
> 2. Unless you use Flink's state API, Flink won't be able to recover the
> numElementsSeen field.
> 3. I think stateful filters are ok to use if your filter needs to be
> stateful. Statefulness usually complicates things so if your function can
> be stateless, then I would recommend to make it stateless. However, there
> are some applications which strictly require statefulness.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html
>
> Cheers,
> Till
>
> On Thu, Nov 7, 2019 at 2:11 PM Timothy Victor  wrote:
>
>> I have a FilterFunction implementation which accepts an argument in its
>> constructor which it stores as an instance member.For example:
>>
>> class ThresholdFilter implements FilterFunction  {
>>
>>   private final MyThreshold threshold;
>>
>>   private int numElementsSeen;
>>
>>   public ThresholdFilter(MyThreshold threshold) {
>> this.threshold = threshold;
>>   }
>>
>>   
>>
>> }
>>
>> The filter uses the threshold in deciding whether or not to filter the
>> incoming element.
>>
>> All this works but I have some gaps in my understanding.
>>
>> 1.   How is this filter stored and recovered in the case of a failure.
>>  Is it just serialized to a POJO and stored in the configured state backend?
>>
>> 2.  When recovered will it maintain the state of all members (e.g. note
>> that I have a numElementsSeen member in the filter which will keep
>> incrementi for each element recevied).
>>
>> 3.  Is this sort of thing even advisable for a filter?  I'm guessing
>> Filters are meant to be reusable across operator instances.  In which case
>> the state could be wrong after recovery?
>>
>> Thanks in advance
>>
>> Tim
>>
>


Re: Flink Filters have state?

2019-11-08 Thread Till Rohrmann
Hi Tim,

1. The filter is stored within the JobGraph which is persisted to a
persistent storage if HA is enabled. Usually, this is either HDFS, S3 or
any other highly available file system. It is just a serialized POJO. If
you want to store your filter's state you would need to use Flink's state
API [1].
2. Unless you use Flink's state API, Flink won't be able to recover the
numElementsSeen field.
3. I think stateful filters are ok to use if your filter needs to be
stateful. Statefulness usually complicates things so if your function can
be stateless, then I would recommend to make it stateless. However, there
are some applications which strictly require statefulness.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html

Cheers,
Till

On Thu, Nov 7, 2019 at 2:11 PM Timothy Victor  wrote:

> I have a FilterFunction implementation which accepts an argument in its
> constructor which it stores as an instance member.For example:
>
> class ThresholdFilter implements FilterFunction  {
>
>   private final MyThreshold threshold;
>
>   private int numElementsSeen;
>
>   public ThresholdFilter(MyThreshold threshold) {
> this.threshold = threshold;
>   }
>
>   
>
> }
>
> The filter uses the threshold in deciding whether or not to filter the
> incoming element.
>
> All this works but I have some gaps in my understanding.
>
> 1.   How is this filter stored and recovered in the case of a failure.
>  Is it just serialized to a POJO and stored in the configured state backend?
>
> 2.  When recovered will it maintain the state of all members (e.g. note
> that I have a numElementsSeen member in the filter which will keep
> incrementi for each element recevied).
>
> 3.  Is this sort of thing even advisable for a filter?  I'm guessing
> Filters are meant to be reusable across operator instances.  In which case
> the state could be wrong after recovery?
>
> Thanks in advance
>
> Tim
>


Flink Filters have state?

2019-11-07 Thread Timothy Victor
I have a FilterFunction implementation which accepts an argument in its
constructor which it stores as an instance member.For example:

class ThresholdFilter implements FilterFunction  {

  private final MyThreshold threshold;

  private int numElementsSeen;

  public ThresholdFilter(MyThreshold threshold) {
this.threshold = threshold;
  }

  

}

The filter uses the threshold in deciding whether or not to filter the
incoming element.

All this works but I have some gaps in my understanding.

1.   How is this filter stored and recovered in the case of a failure.   Is
it just serialized to a POJO and stored in the configured state backend?

2.  When recovered will it maintain the state of all members (e.g. note
that I have a numElementsSeen member in the filter which will keep
incrementi for each element recevied).

3.  Is this sort of thing even advisable for a filter?  I'm guessing
Filters are meant to be reusable across operator instances.  In which case
the state could be wrong after recovery?

Thanks in advance

Tim