Hi Nick
Following is an example(could not run but just to explain the idea). I use
the `KeyedBroadcastProcessFunction` because I saw your code use the
keyedstate.
private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast
extends KeyedBroadcastProcessFunction<String, String, Integer, String> {
private static final long serialVersionUID = 7496674620398203933L;
private final ListStateDescriptor<String> listStateDesc =
new ListStateDescriptor<>("cache element",
BasicTypeInfo.STRING_TYPE_INFO);
@Override
public void processBroadcastElement(Integer value, Context ctx,
Collector<String> out)
throws Exception {
ctx.applyToKeyedState(
listStateDesc,
new KeyedStateFunction<String, ListState<String>>() {
@Override
public void process(String key, ListState<String>
state) throws Exception {
// do the logical with cache state and broadcat value;
// clear the state
state.clear();
}
});
}
@Override
public void processElement(String value, ReadOnlyContext ctx,
Collector<String> out)
throws Exception {
YourBroadCastState = ctx.getBroadcastState("your broad cast state");
if (YourBroadCastState is empty) {
// cache the element
getRuntimeContext().getListState(listStateDesc).add(value);
} else {
// do your business logic with YourBroadCastState and your value.
}
}
}
Best,
Guowei
On Wed, Jan 27, 2021 at 4:31 AM Nick Bendtner <[email protected]> wrote:
> Thanks a lot Guowei, that makes sense. I will go with the caching
> approach. Can you point me to any example which shows what is the most
> efficient way to cache elements.
> Thanks a ton for your help.
>
> Best,
> Nick
>
> On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <[email protected]> wrote:
>
>> Hi,Nick
>> I do not think you could update the `myState` in the
>> `processBroadcastElement`. It is because you need a key before to update
>> the keyedstate. But there is no key in `processBroadcastElement` .
>> Best,
>> Guowei
>>
>>
>> On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <[email protected]> wrote:
>>
>>> Hi Guowei,
>>> I am not using a keyed broadcast function, I use [1]. My question is,
>>> can a non broadcast state, for instance value state/map state be updated
>>> whenever I get a broadcast event in *processBroadcastElement*. This way
>>> the state updates are consistent since each instance of the task gets the
>>> same broadcast element.
>>>
>>> ```
>>> private MapState<String, MyState> myState;
>>>
>>> @Override
>>> public void processElement(InputType value, ReadOnlyContext ctx,
>>> Collector<OutputType> out) throws Exception {
>>> // Iterate over map state.
>>> myState.iterator().forEach(entry -> ())// Business logic
>>>
>>> // Do things
>>> }
>>>
>>> @Override
>>> public void processBroadcastElement(BroadcastedStateType value,
>>> Context ctx, Collector<OutputType> out) throws Exception {
>>> // update map state which is not a broadcast state. Same update in
>>> every sub operator
>>> state.put(value.ID(), value.state()); // Update the mapState
>>> with value from broadcast
>>> }
>>>
>>>
>>> @Override
>>> public void snapshotState(FunctionSnapshotContext context) throws
>>> Exception {
>>>
>>> // called when it's time to save state
>>>
>>> myState.clear();
>>>
>>> // Update myState with current application state
>>>
>>> }
>>>
>>> @Override
>>> public void initializeState(FunctionInitializationContext context)
>>> throws Exception {
>>>
>>> // called when things start up, possibly recovering from an error
>>>
>>> descriptor = new MapStateDescriptor<>("state", Types.STRING,
>>> Types.POJO(BroadcastedStateType.class));
>>>
>>> myState = context.getKeyedStateStore().getMapState(descriptor);
>>>
>>> if (context.isRestored()) {
>>>
>>> // restore application state from myState
>>>
>>> }
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
>>> .
>>>
>>>
>>> Best,
>>> Nick.
>>>
>>> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <[email protected]> wrote:
>>>
>>>> Hi,Nick
>>>> Normally you could not iterate all the keyed states, but the
>>>> `BroadCastState` & `applyTokeyedState` could do that.
>>>> For example, before you get the broadcast side elements you might
>>>> choose to cache the non-broadcast element to the keyed state. After the
>>>> broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate
>>>> all the elements you "cached" in the keyed state and do your business
>>>> logic.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks Guowei. Another question I have is, what is the use of a
>>>>> broadcast state when I can update a map state or value state inside of the
>>>>> process broadcast element method and use that state to do a lookup in the
>>>>> process element method like this example
>>>>>
>>>>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate
>>>>>
>>>>>
>>>>> Best,
>>>>> Nick
>>>>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi, Nick
>>>>>> You might need to handle it yourself If you have to process an
>>>>>> element only after you get the broadcast state.
>>>>>> For example, you could “cache” the element to the state and handle
>>>>>> it when the element from the broadcast side elements are arrived.
>>>>>> Specially
>>>>>> if you are using the `KeyedBroadcastProcessFunction` you could use the
>>>>>> `applyToKeyedState` to access the element you cache before.
>>>>>>
>>>>>> Best,
>>>>>> Guowei
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>> What is the way to initialize broadcast state(say with default
>>>>>>> values) before the first element shows up in the broadcasting stream? I
>>>>>>> do
>>>>>>> a lookup on the broadcast state to process transactions which come from
>>>>>>> another stream. The problem is the broadcast state is empty until the
>>>>>>> first
>>>>>>> element shows up.
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Nick.
>>>>>>>
>>>>>>