Hi , I have two kafka topics (tracking and rules) and I would like to join "tracking" datastream with "rules" datastream as the data arrives in the "tracking" datastream.
The problem with a join is that the rules only “survive” for the length of the window while I suspect that i want them to survive longer than that so that they can be applied to events arriving in the future. I tested ConnectedStream and CoFlatMapFunction but the result is not as I wait. *For the execution:* 1) I added 3 rules on "rules" topic (imei: "01","02,"03") 2) Perform 15 events with different imei but i guess i have problem with "keyby" *Result : * <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n12999/2222222.jpg> *Code :* ConnectedStreams<TrackEvent, RulesEvent> connectedStreams = inputEventStream.connect(inputRulesStream).keyBy("imei","imei"); DataStream<Tuple2<TrackEvent, RulesEvent>> ds= connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent, Tuple2<TrackEvent,RulesEvent>>() { Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent, RulesEvent>(); @Override public void flatMap1(TrackEvent trackEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { t2.f0=trackEvent; collector.collect(t2); // t2=new Tuple2<TrackEvent, RulesEvent>(); } @Override public void flatMap2(RulesEvent rulesEvent, Collector<Tuple2<TrackEvent, RulesEvent>> collector) throws Exception { t2.f1 = rulesEvent; //collector.collect(t2); } }); ds.printToErr(); Best, -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.