Hi Nick
Following is an example(could not run but just to explain the idea). I use
the `KeyedBroadcastProcessFunction` because I saw your code use the
keyedstate.

private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast
        extends KeyedBroadcastProcessFunction<String, String, Integer, String> {

    private static final long serialVersionUID = 7496674620398203933L;

    private final ListStateDescriptor<String> listStateDesc =
            new ListStateDescriptor<>("cache element",
BasicTypeInfo.STRING_TYPE_INFO);



    @Override
    public void processBroadcastElement(Integer value, Context ctx,
Collector<String> out)
            throws Exception {
        ctx.applyToKeyedState(
                listStateDesc,
                new KeyedStateFunction<String, ListState<String>>() {
                    @Override
                    public void process(String key, ListState<String>
state) throws Exception {
                        // do the logical with cache state and broadcat value;
                        // clear the state
                        state.clear();
                    }
                });
    }

    @Override
    public void processElement(String value, ReadOnlyContext ctx,
Collector<String> out)
            throws Exception {
        YourBroadCastState = ctx.getBroadcastState("your broad cast state");
        if (YourBroadCastState is empty) {
            // cache the element
            getRuntimeContext().getListState(listStateDesc).add(value);
        } else {
            // do your business logic with YourBroadCastState and your value.
        }
    }
}

Best,
Guowei


On Wed, Jan 27, 2021 at 4:31 AM Nick Bendtner <buggi...@gmail.com> wrote:

> Thanks a lot Guowei, that makes sense. I will go with the caching
> approach. Can you point me to any example which shows what is the most
> efficient way to cache elements.
> Thanks a ton for your help.
>
> Best,
> Nick
>
> On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi,Nick
>> I do not think you could update the `myState`  in the
>> `processBroadcastElement`. It is because you need a key before to update
>> the keyedstate. But there is no key in `processBroadcastElement` .
>> Best,
>> Guowei
>>
>>
>> On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <buggi...@gmail.com> wrote:
>>
>>> Hi Guowei,
>>> I am not using a keyed broadcast function, I use [1].  My question is,
>>> can a non broadcast state, for instance value state/map state be updated
>>> whenever I get a broadcast event in *processBroadcastElement*. This way
>>> the state updates are consistent since each instance of the task gets the
>>> same broadcast element.
>>>
>>> ```
>>> private MapState<String, MyState> myState;
>>>
>>> @Override
>>>    public void processElement(InputType value, ReadOnlyContext ctx,
>>> Collector<OutputType> out) throws Exception {
>>>      // Iterate over map state.
>>>        myState.iterator().forEach(entry -> ())// Business logic
>>>
>>>        // Do things
>>>    }
>>>
>>>    @Override
>>>    public void processBroadcastElement(BroadcastedStateType value,
>>> Context ctx, Collector<OutputType> out) throws Exception {
>>>      // update map state which is not a broadcast state. Same update in
>>> every sub operator
>>>        state.put(value.ID(), value.state());   // Update the mapState
>>> with value from broadcast
>>>    }
>>>
>>>
>>>    @Override
>>>  public void snapshotState(FunctionSnapshotContext context) throws
>>> Exception {
>>>
>>>      // called when it's time to save state
>>>
>>>      myState.clear();
>>>
>>>          // Update myState with current application state
>>>
>>>  }
>>>
>>>  @Override
>>>  public void initializeState(FunctionInitializationContext context)
>>> throws Exception {
>>>
>>>      // called when things start up, possibly recovering from an error
>>>
>>>      descriptor = new MapStateDescriptor<>("state", Types.STRING,
>>> Types.POJO(BroadcastedStateType.class));
>>>
>>>      myState = context.getKeyedStateStore().getMapState(descriptor);
>>>
>>>      if (context.isRestored()) {
>>>
>>>          // restore application state from myState
>>>
>>>      }
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html
>>> .
>>>
>>>
>>> Best,
>>> Nick.
>>>
>>> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote:
>>>
>>>> Hi,Nick
>>>> Normally you could not iterate all the keyed states, but the
>>>> `BroadCastState` & `applyTokeyedState` could do that.
>>>> For example, before you get the broadcast side elements you might
>>>> choose to cache the non-broadcast element to the keyed state. After the
>>>> broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate
>>>> all the elements you "cached" in the keyed state and do your business 
>>>> logic.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html
>>>>
>>>> Best,
>>>> Guowei
>>>>
>>>>
>>>> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Guowei. Another question I have is, what is the use of a
>>>>> broadcast state when I can update a map state or value state inside of the
>>>>> process broadcast element method and use that state to do a lookup in the
>>>>> process element method like this example
>>>>>
>>>>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate
>>>>>
>>>>>
>>>>> Best,
>>>>> Nick
>>>>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi, Nick
>>>>>>   You might need to handle it yourself If you have to process an
>>>>>> element only after you get the broadcast state.
>>>>>>   For example, you could “cache” the element to the state and handle
>>>>>> it when the element from the broadcast side elements are arrived. 
>>>>>> Specially
>>>>>> if you are using the `KeyedBroadcastProcessFunction` you could use the
>>>>>> `applyToKeyedState` to access the element you cache before.
>>>>>>
>>>>>> Best,
>>>>>> Guowei
>>>>>>
>>>>>>
>>>>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>> What is the way to initialize broadcast state(say with default
>>>>>>> values) before the first element shows up in the broadcasting stream? I 
>>>>>>> do
>>>>>>> a lookup on the broadcast state to process transactions which come from
>>>>>>> another stream. The problem is the broadcast state is empty until the 
>>>>>>> first
>>>>>>> element shows up.
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Nick.
>>>>>>>
>>>>>>

Reply via email to