Ok, that makes sense. > You just need to correct the acc state to what it expects to be (say re-evaluate the acc without the record that needs retraction) when you received the retraction message.
So for example, if i just remove all items from acc.groupIdSet on retraction it will know to clear out the state entirely from rocks? If a user gets deleted altogether (and my groupby is on user_id) what sort of retraction do I need to evaluate then? Because I'm thinking now it will need to just delete the state entirely and pass a full retraction of the state downstream, but I don't know if deleting state from rocks happens automatically or I need to make it do that in the retract method somehow. On Wed, Dec 9, 2020 at 6:16 PM Danny Chan <danny0...@apache.org> wrote: > No, the group agg, stream-stream join and rank are all stateful > operators which need a state-backend to bookkeep the acc values. > > But it is only required to emit the retractions when the stateful operator > A has a downstream operator B that is also stateful, because the B needs > the retractions to correct the accs. If B is not stateful, just emitting > the new record to override is enough. > > You just need to correct the acc state to what it expects to be (say > re-evaluate the acc without the record that needs retraction) when you > received the retraction message. > > Rex Fenley <r...@remind101.com> 于2020年12月10日周四 上午2:44写道: > >> So from what I'm understanding, the aggregate itself is not a "stateful >> operator" but one may follow it? How does the aggregate accumulator keep >> old values then? It can't all just live in memory, actually, looking at the >> savepoints it looks like there's state associated with our aggregate >> operator. >> >> To clarify my concern too, in my retract function impl in the aggregate >> function class, all I do is remove a value (a group id) from the >> accumulator set (which is an array). For example, if there is only 1 >> group_id left for a user and it gets deleted, that group_id will be removed >> from the accumulator set and the set will be empty. I would hope that at >> that point, given that there are no remaining rows for the aggregate, that >> I could or flink will just delete the associated stored accumulator >> altogether i.e. delete `user_id_1 -> []`. Is it possible that both the >> groups and the user need to be deleted for everything to clear from >> storage? That might make more sense actually.. >> >> If this doesn't happen, since users delete themselves and their groups >> all the time, we'll be storing all these empty data sets in rocks for no >> reason. To clarify, we're using Debezium as our source and using Flink as a >> materialization engine, so we never want to explicitly set a timeout on any >> of our data, we just want to scale up predictably with our user growth. >> >> Thanks! >> >> On Wed, Dec 9, 2020 at 4:14 AM Danny Chan <danny0...@apache.org> wrote: >> >>> Hi, Rex Fenley ~ >>> >>> If there is stateful operator as the output of the aggregate function. >>> Then each time the function receives an update (or delete) for the key, the >>> agg operator would emit 2 messages, one for retracting the old record, one >>> for the new message. For your case, the new message is the DELETE. >>> >>> If there is no stateful operator, the aggregate operator would just emit >>> the update after (the new) message which is the delete. >>> >>> Rex Fenley <r...@remind101.com> 于2020年12月9日周三 上午4:30写道: >>> >>>> Hello, >>>> >>>> I'd like to better understand delete behavior of AggregateFunctions. >>>> Let's assume there's an aggregate of `user_id` to a set of `group_ids` for >>>> groups belonging to that user. >>>> `user_id_1 -> [group_id_1, group_id_2, etc.]` >>>> Now let's assume sometime later that deletes arrive for all rows which >>>> produce user_id_1's group_id's. >>>> >>>> Would the aggregate function completely delete the associated state >>>> from RocksDB or would it leave something like `user_id_1 -> []` sitting in >>>> RocksDB forever? >>>> >>>> We have an aggregate similar to this where users could delete >>>> themselves and we want to make sure we're not accumulating data forever for >>>> those users. >>>> >>>> Thanks! >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>