Hi Nick Following is an example(could not run but just to explain the idea). I use the `KeyedBroadcastProcessFunction` because I saw your code use the keyedstate.
private static class StatefulFunctionWithKeyedStateAccessedOnBroadcast extends KeyedBroadcastProcessFunction<String, String, Integer, String> { private static final long serialVersionUID = 7496674620398203933L; private final ListStateDescriptor<String> listStateDesc = new ListStateDescriptor<>("cache element", BasicTypeInfo.STRING_TYPE_INFO); @Override public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception { ctx.applyToKeyedState( listStateDesc, new KeyedStateFunction<String, ListState<String>>() { @Override public void process(String key, ListState<String> state) throws Exception { // do the logical with cache state and broadcat value; // clear the state state.clear(); } }); } @Override public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { YourBroadCastState = ctx.getBroadcastState("your broad cast state"); if (YourBroadCastState is empty) { // cache the element getRuntimeContext().getListState(listStateDesc).add(value); } else { // do your business logic with YourBroadCastState and your value. } } } Best, Guowei On Wed, Jan 27, 2021 at 4:31 AM Nick Bendtner <buggi...@gmail.com> wrote: > Thanks a lot Guowei, that makes sense. I will go with the caching > approach. Can you point me to any example which shows what is the most > efficient way to cache elements. > Thanks a ton for your help. > > Best, > Nick > > On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi,Nick >> I do not think you could update the `myState` in the >> `processBroadcastElement`. It is because you need a key before to update >> the keyedstate. But there is no key in `processBroadcastElement` . >> Best, >> Guowei >> >> >> On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner <buggi...@gmail.com> wrote: >> >>> Hi Guowei, >>> I am not using a keyed broadcast function, I use [1]. My question is, >>> can a non broadcast state, for instance value state/map state be updated >>> whenever I get a broadcast event in *processBroadcastElement*. This way >>> the state updates are consistent since each instance of the task gets the >>> same broadcast element. >>> >>> ``` >>> private MapState<String, MyState> myState; >>> >>> @Override >>> public void processElement(InputType value, ReadOnlyContext ctx, >>> Collector<OutputType> out) throws Exception { >>> // Iterate over map state. >>> myState.iterator().forEach(entry -> ())// Business logic >>> >>> // Do things >>> } >>> >>> @Override >>> public void processBroadcastElement(BroadcastedStateType value, >>> Context ctx, Collector<OutputType> out) throws Exception { >>> // update map state which is not a broadcast state. Same update in >>> every sub operator >>> state.put(value.ID(), value.state()); // Update the mapState >>> with value from broadcast >>> } >>> >>> >>> @Override >>> public void snapshotState(FunctionSnapshotContext context) throws >>> Exception { >>> >>> // called when it's time to save state >>> >>> myState.clear(); >>> >>> // Update myState with current application state >>> >>> } >>> >>> @Override >>> public void initializeState(FunctionInitializationContext context) >>> throws Exception { >>> >>> // called when things start up, possibly recovering from an error >>> >>> descriptor = new MapStateDescriptor<>("state", Types.STRING, >>> Types.POJO(BroadcastedStateType.class)); >>> >>> myState = context.getKeyedStateStore().getMapState(descriptor); >>> >>> if (context.isRestored()) { >>> >>> // restore application state from myState >>> >>> } >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html >>> . >>> >>> >>> Best, >>> Nick. >>> >>> On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma <guowei....@gmail.com> wrote: >>> >>>> Hi,Nick >>>> Normally you could not iterate all the keyed states, but the >>>> `BroadCastState` & `applyTokeyedState` could do that. >>>> For example, before you get the broadcast side elements you might >>>> choose to cache the non-broadcast element to the keyed state. After the >>>> broadcast elements arrive you need to use `applyTokeyedState`[1] to iterate >>>> all the elements you "cached" in the keyed state and do your business >>>> logic. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html >>>> >>>> Best, >>>> Guowei >>>> >>>> >>>> On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner <buggi...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Guowei. Another question I have is, what is the use of a >>>>> broadcast state when I can update a map state or value state inside of the >>>>> process broadcast element method and use that state to do a lookup in the >>>>> process element method like this example >>>>> >>>>> https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate >>>>> >>>>> >>>>> Best, >>>>> Nick >>>>> On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma <guowei....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, Nick >>>>>> You might need to handle it yourself If you have to process an >>>>>> element only after you get the broadcast state. >>>>>> For example, you could “cache” the element to the state and handle >>>>>> it when the element from the broadcast side elements are arrived. >>>>>> Specially >>>>>> if you are using the `KeyedBroadcastProcessFunction` you could use the >>>>>> `applyToKeyedState` to access the element you cache before. >>>>>> >>>>>> Best, >>>>>> Guowei >>>>>> >>>>>> >>>>>> On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner <buggi...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi guys, >>>>>>> What is the way to initialize broadcast state(say with default >>>>>>> values) before the first element shows up in the broadcasting stream? I >>>>>>> do >>>>>>> a lookup on the broadcast state to process transactions which come from >>>>>>> another stream. The problem is the broadcast state is empty until the >>>>>>> first >>>>>>> element shows up. >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Nick. >>>>>>> >>>>>>