No, the group agg, stream-stream join and rank are all stateful
operators which need a state-backend to bookkeep the acc values.

But it is only required to emit the retractions when the stateful operator
A has a downstream operator B that is also stateful, because the B needs
the retractions to correct the accs. If B is not stateful, just emitting
the new record to override is enough.

You just need to correct the acc state to what it expects to be (say
re-evaluate the acc without the record that needs retraction) when you
received  the retraction message.

Rex Fenley <r...@remind101.com> 于2020年12月10日周四 上午2:44写道:

> So from what I'm understanding, the aggregate itself is not a "stateful
> operator" but one may follow it? How does the aggregate accumulator keep
> old values then? It can't all just live in memory, actually, looking at the
> savepoints it looks like there's state associated with our aggregate
> operator.
>
> To clarify my concern too, in my retract function impl in the aggregate
> function class, all I do is remove a value (a group id) from the
> accumulator set (which is an array). For example, if there is only 1
> group_id left for a user and it gets deleted, that group_id will be removed
> from the accumulator set and the set will be empty. I would hope that at
> that point, given that there are no remaining rows for the aggregate, that
> I could or flink will just delete the associated stored accumulator
> altogether i.e. delete `user_id_1 -> []`. Is it possible that both the
> groups and the user need to be deleted for everything to clear from
> storage? That might make more sense actually..
>
> If this doesn't happen, since users delete themselves and their groups all
> the time, we'll be storing all these empty data sets in rocks for no
> reason. To clarify, we're using Debezium as our source and using Flink as a
> materialization engine, so we never want to explicitly set a timeout on any
> of our data, we just want to scale up predictably with our user growth.
>
> Thanks!
>
> On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <danny0...@apache.org> wrote:
>
>> Hi, Rex Fenley ~
>>
>> If there is stateful operator as the output of the aggregate function.
>> Then each time the function receives an update (or delete) for the key, the
>> agg operator would emit 2 messages, one for retracting the old record, one
>> for the new message. For your case, the new message is the DELETE.
>>
>> If there is no stateful operator, the aggregate operator would just emit
>> the update after (the new) message which is the delete.
>>
>> Rex Fenley <r...@remind101.com> 于2020年12月9日周三 上午4:30写道:
>>
>>> Hello,
>>>
>>> I'd like to better understand delete behavior of AggregateFunctions.
>>> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for
>>> groups belonging to that user.
>>> `user_id_1 -> [group_id_1, group_id_2, etc.]`
>>> Now let's assume sometime later that deletes arrive for all rows which
>>> produce user_id_1's group_id's.
>>>
>>> Would the aggregate function completely delete the associated state from
>>> RocksDB or would it leave something like `user_id_1 -> []` sitting in
>>> RocksDB forever?
>>>
>>> We have an aggregate similar to this where users could delete themselves
>>> and we want to make sure we're not accumulating data forever for those
>>> users.
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Reply via email to