I created a new descriptor and rulestream used it in the second process
function and this works fine.

public static final MapStateDescriptor<Integer, Rule> rulesDescriptor =
        new MapStateDescriptor<>(
                "rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

public static final MapStateDescriptor<Integer, Rule> rulesDescriptor2 =

        new MapStateDescriptor<>(
                "rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));


BroadcastStream<Rule> rulesStream =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor);

BroadcastStream<Rule> rulesStream2 =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor2);


SingleOutputStreamOperator<Keyed<RawEvent, String, Integer>>
keyedSingleOutputStream =
        rawEventStream.
                connect(rulesStream).
                process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

SingleOutputStreamOperator<RTEvent> rtEventDataStream =
        keyedSingleOutputStream.
                keyBy((keyed) -> keyed.getKey()).
                connect(rulesStream2).
                process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Fri, Feb 26, 2021 at 3:38 PM Arvid Heise <ar...@apache.org> wrote:

> Hi,
>
> I have no idea what's going on. There is no mechanism in DataStream to
> react to deleted records.
>
> Can you reproduce it locally and debug through it?
>
>
>
> On Wed, Feb 24, 2021 at 5:21 PM bat man <tintin0...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> The Flink application was not re-started. I had checked on that.
>> By adding rules to the state of process function you mean the state which
>> is local to the keyedprocess function?
>> From [1] what is being done here -
>>
>> final MapState<String, List<Item>> state = getRuntimeContext().
>> getMapState(mapStateDesc);
>>
>> state.put(ruleName, stored);
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>
>> Thanks.
>>
>>
>> On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Could you double-check if your Flink application was restarted between
>>> Kafka topic was cleared and the time you saw that the rules have been lost?
>>>
>>> I suspect that you deleted the Kafka topic and the Flink application
>>> then failed and restarted. Upon restart it read the empty rule topic.
>>>
>>> To solve it, you probably want to add the rules to the state of your
>>> process function [1]. If you have done that, I'm a bit lost.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>
>>> On Wed, Feb 24, 2021 at 7:30 AM bat man <tintin0...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> This is my code below -
>>>> As mentioned earlier the rulesStream us again used in later processing.
>>>> Below you can see the rulesStream is again connected with the result stream
>>>> of the first process stream. Do you think this is the reason rules
>>>> operators state getting overridden when the data in kafka is deleted?
>>>> My question is if the data is not present in kafka then no data is read
>>>> in stream how it is updating the existing state data.
>>>>
>>>> public static final MapStateDescriptor<Integer, Rule> rulesDescriptor =
>>>>         new MapStateDescriptor<>(
>>>>                 "rules", BasicTypeInfo.INT_TYPE_INFO, 
>>>> TypeInformation.of(Rule.class));
>>>>
>>>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
>>>> DataStream<RawEvent> rawEventStream = 
>>>> validateData(getRawEventStream(rawEventKafkaSource,env));
>>>>
>>>>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>>>>  DataStream<Rule> rulesDataStream = getRulesStream(rulesKafkaSource,env);
>>>>
>>>>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>>>>  DataStream<Device> deviceDataStream = getDeviceStream(deviceSource,env);
>>>>
>>>>  BroadcastStream<Rule> rulesStream = 
>>>> rulesDataStream.broadcast(rulesDescriptor);
>>>>
>>>>  SingleOutputStreamOperator<Keyed<RawEvent, String, Integer>> 
>>>> keyedSingleOutputStream =
>>>>          rawEventStream.
>>>>                  connect(rulesStream).
>>>>                  process(new 
>>>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>>>>
>>>>  SingleOutputStreamOperator<RTEvent> rtEventDataStream =
>>>>          keyedSingleOutputStream.
>>>>                  keyBy((keyed) -> keyed.getKey()).
>>>>                  connect(rulesStream).
>>>>                  process(new 
>>>> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>>>>
>>>>
>>>> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
>>>> khachatryan.ro...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Deletion of messages in Kafka shouldn't affect Flink state in general.
>>>>> Probably, some operator in your pipeline is re-reading the topic
>>>>> and overwrites the state, dropping what was deleted by Kafka.
>>>>> Could you share the code?
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Tue, Feb 23, 2021 at 7:12 AM bat man <tintin0...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have 2 streams one event data and the other rules. I broadcast the
>>>>>> rules stream and then key the data stream on event type. The connected
>>>>>> stream is processed thereafter.
>>>>>> We faced an issue where the rules data in the topic got deleted
>>>>>> because of Kafka retention policy.
>>>>>> Post this the existing rules data also got dropped in the broadcast
>>>>>> state and the processing stopped.
>>>>>>
>>>>>> As per my understanding the rules which were present in broadcast
>>>>>> state should still exist even if the data was deleted in Kafka as the 
>>>>>> rules
>>>>>> dats was already processed and stored in state map.
>>>>>>
>>>>>> PS: I’m reusing the rules stream as broadcast later in processing as
>>>>>> well. Could this be an issue?
>>>>>>
>>>>>> Thanks,
>>>>>> Hemant
>>>>>>
>>>>>

Reply via email to