I created a new descriptor and rulestream used it in the second process function and this works fine.
public static final MapStateDescriptor<Integer, Rule> rulesDescriptor = new MapStateDescriptor<>( "rules", BasicTypeInfo.INT_TYPE_INFO, TypeInformation.of(Rule.class)); public static final MapStateDescriptor<Integer, Rule> rulesDescriptor2 = new MapStateDescriptor<>( "rules", BasicTypeInfo.INT_TYPE_INFO, TypeInformation.of(Rule.class)); BroadcastStream<Rule> rulesStream = rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor); BroadcastStream<Rule> rulesStream2 = rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor2); SingleOutputStreamOperator<Keyed<RawEvent, String, Integer>> keyedSingleOutputStream = rawEventStream. connect(rulesStream). process(new DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5); SingleOutputStreamOperator<RTEvent> rtEventDataStream = keyedSingleOutputStream. keyBy((keyed) -> keyed.getKey()). connect(rulesStream2). process(new DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5); On Fri, Feb 26, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote: > Hi, > > I have no idea what's going on. There is no mechanism in DataStream to > react to deleted records. > > Can you reproduce it locally and debug through it? > > > > On Wed, Feb 24, 2021 at 5:21 PM bat man <tintin0...@gmail.com> wrote: > >> Hi Arvid, >> >> The Flink application was not re-started. I had checked on that. >> By adding rules to the state of process function you mean the state which >> is local to the keyedprocess function? >> From [1] what is being done here - >> >> final MapState<String, List<Item>> state = getRuntimeContext(). >> getMapState(mapStateDesc); >> >> state.put(ruleName, stored); >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >> >> Thanks. >> >> >> On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise <ar...@apache.org> wrote: >> >>> Could you double-check if your Flink application was restarted between >>> Kafka topic was cleared and the time you saw that the rules have been lost? >>> >>> I suspect that you deleted the Kafka topic and the Flink application >>> then failed and restarted. Upon restart it read the empty rule topic. >>> >>> To solve it, you probably want to add the rules to the state of your >>> process function [1]. If you have done that, I'm a bit lost. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>> >>> On Wed, Feb 24, 2021 at 7:30 AM bat man <tintin0...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> This is my code below - >>>> As mentioned earlier the rulesStream us again used in later processing. >>>> Below you can see the rulesStream is again connected with the result stream >>>> of the first process stream. Do you think this is the reason rules >>>> operators state getting overridden when the data in kafka is deleted? >>>> My question is if the data is not present in kafka then no data is read >>>> in stream how it is updating the existing state data. >>>> >>>> public static final MapStateDescriptor<Integer, Rule> rulesDescriptor = >>>> new MapStateDescriptor<>( >>>> "rules", BasicTypeInfo.INT_TYPE_INFO, >>>> TypeInformation.of(Rule.class)); >>>> >>>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC)); >>>> DataStream<RawEvent> rawEventStream = >>>> validateData(getRawEventStream(rawEventKafkaSource,env)); >>>> >>>> rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC)); >>>> DataStream<Rule> rulesDataStream = getRulesStream(rulesKafkaSource,env); >>>> >>>> deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC)); >>>> DataStream<Device> deviceDataStream = getDeviceStream(deviceSource,env); >>>> >>>> BroadcastStream<Rule> rulesStream = >>>> rulesDataStream.broadcast(rulesDescriptor); >>>> >>>> SingleOutputStreamOperator<Keyed<RawEvent, String, Integer>> >>>> keyedSingleOutputStream = >>>> rawEventStream. >>>> connect(rulesStream). >>>> process(new >>>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5); >>>> >>>> SingleOutputStreamOperator<RTEvent> rtEventDataStream = >>>> keyedSingleOutputStream. >>>> keyBy((keyed) -> keyed.getKey()). >>>> connect(rulesStream). >>>> process(new >>>> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5); >>>> >>>> >>>> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman < >>>> khachatryan.ro...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> Deletion of messages in Kafka shouldn't affect Flink state in general. >>>>> Probably, some operator in your pipeline is re-reading the topic >>>>> and overwrites the state, dropping what was deleted by Kafka. >>>>> Could you share the code? >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Tue, Feb 23, 2021 at 7:12 AM bat man <tintin0...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I have 2 streams one event data and the other rules. I broadcast the >>>>>> rules stream and then key the data stream on event type. The connected >>>>>> stream is processed thereafter. >>>>>> We faced an issue where the rules data in the topic got deleted >>>>>> because of Kafka retention policy. >>>>>> Post this the existing rules data also got dropped in the broadcast >>>>>> state and the processing stopped. >>>>>> >>>>>> As per my understanding the rules which were present in broadcast >>>>>> state should still exist even if the data was deleted in Kafka as the >>>>>> rules >>>>>> dats was already processed and stored in state map. >>>>>> >>>>>> PS: I’m reusing the rules stream as broadcast later in processing as >>>>>> well. Could this be an issue? >>>>>> >>>>>> Thanks, >>>>>> Hemant >>>>>> >>>>>