Hi Dawid,

Thanks for following up on this.

Let me know if you’d like me to update the documentation, that seems pretty 
straightforward :)

Adding support for compressing broadcast state feels like something more 
challenging, but I could take a swing at it if you’d like.

Regards,

— Ken

PS - re the key serializer, I was looking at a hack where I keep around the 
previous record so I could do delta encoding…but that’s also fragile.

> On Nov 21, 2022, at 1:36 AM, Dawid Wysakowicz <dwysakow...@apache.org> wrote:
> 
> And yes, I read "Compression works on the granularity of key-groups in keyed 
> state” as meaning “When compressing keyed state, it’s done per key-group” and 
> not “Compression only works on keyed state” :)
> 
> Totally agree. Docs could definitely be more straightforward about that. I 
> created FLINK-30112[1] to improve that.
> 
> I agree that "KeyedState should be preferred in majority of cases”.  
> Unfortunately for a broadcast stream there’s option to used keyed state, 
> right?
> 
> Absolutely, I mentioned this just an excuse why we did not invest much in 
> OperatorState.
> 
> So assuming that’s the current situation, and I’m not in a position to have 
> my client deploy a patched version of Flink, which of the following (sketchy) 
> ideas has any potential here…
> 
> I am afraid it won't be easy to hack the compression in. 
> 
> Your suggestion 2. won't help imo, as the compression would compress 
> separately for each entry. I believe that's not what you're looking for. 
> 
> The option 1. has more potential, as it would apply compression for the 
> entire state. As you said though, this would require class overloading which 
> is always fragile.
> 
> To be honest, I can't think of a better way atm.
> 
> Btw, I believe being able to apply compression for operator state is a valid 
> request so I created FLINK-30113[2]
> 
> [1] https://issues.apache.org/jira/browse/FLINK-30112 
> <https://issues.apache.org/jira/browse/FLINK-30112>
> [2] https://issues.apache.org/jira/browse/FLINK-30113 
> <https://issues.apache.org/jira/browse/FLINK-30113>
> 
> On 18/11/2022 02:13, Ken Krugler wrote:
>> Hi Dawid,
>> 
>> Thanks for getting back to me.
>> 
>> And yes, I read "Compression works on the granularity of key-groups in keyed 
>> state” as meaning “When compressing keyed state, it’s done per key-group” 
>> and not “Compression only works on keyed state” :)
>> 
>> I agree that "KeyedState should be preferred in majority of cases”.  
>> Unfortunately for a broadcast stream there’s option to used keyed state, 
>> right?
>> 
>> So assuming that’s the current situation, and I’m not in a position to have 
>> my client deploy a patched version of Flink, which of the following 
>> (sketchy) ideas has any potential here…
>> 
>> 1. Implement a version of HeapBroadcastState that compresses the state, and 
>> rely on Flink’s classloader finding it in my jar first.
>> 
>> 2. Register a custom compressing serializer for my state’s key class, 
>> assuming that will get picked up by the call to 
>> stateMetaInfo.getKeySerializer().
>> 
>> Or something else?
>> 
>> Thanks!
>> 
>> — Ken
>> 
>> 
>>> On Nov 17, 2022, at 12:06 AM, Dawid Wysakowicz <dwysakow...@apache.org> 
>>> <mailto:dwysakow...@apache.org> wrote:
>>> 
>>> Cross posting answer from SO:
>>> 
>>> BroadcastState is an operator state not a KeyedState. The referenced docs 
>>> refer to a KeyedState:
>>> 
>>> Compression works on the granularity of key-groups in keyed state,
>>> 
>>> Probably docs could be more explicit about this behaviour.
>>> 
>>> Unfortunately as far as I know there is no compression for OperatorState. I 
>>> am not 100% sure, but I believe it's just has never been implemented, 
>>> because we did not want to invest in it, as KeyedState should be preferred 
>>> in majority of cases.
>>> 
>>> Best,
>>> 
>>> Dawid
>>> 
>>> On 16/11/2022 23:27, Ken Krugler wrote:
>>>> Hi all,
>>>> 
>>>> Just posted this question on SO: How to enable compression for Flink 
>>>> broadcast state checkpoints 
>>>> <https://stackoverflow.com/q/74466988/231762?sem=2> 
>>>> <https://stackoverflow.com/q/74466988/231762?sem=2>
>>>> Basically it doesn’t look like broadcast state respects the compressed 
>>>> state (checkpoints/savepoints) setting, but I might be reading the code 
>>>> wrong.
>>>> 
>>>> Hoping someone (like Dawid Wysakowicz) can chime in here, thanks!
>>>> 
>>>> — Ken
>>>> 
>>>> --------------------------
>>>> Ken Krugler
>>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> 
>>>> <http://www.scaleunlimited.com/> <http://www.scaleunlimited.com/>
>>>> Custom big data solutions
>>>> Flink, Pinot, Solr, Elasticsearch
>>>> 
>>> <OpenPGP_0x31D2DD10BFC15A2D.asc>
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 
>> 
> <OpenPGP_0x31D2DD10BFC15A2D.asc>

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to