Hi ,
I have two kafka topics (tracking and rules) and I would like to join
"tracking" datastream with "rules" datastream as the data arrives in the
"tracking" datastream. 

The problem with a join is that the rules only “survive” for the length of
the window while I suspect that i want them to survive longer than that so
that they can be applied to events arriving in the future.

I tested ConnectedStream and CoFlatMapFunction but the result is not as I
wait.

*For the execution:*

1) I added 3 rules on "rules" topic (imei: "01","02,"03") 
2) Perform 15 events with different imei but i guess i have problem with
"keyby"

*Result : *

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n12999/2222222.jpg>
 

*Code :*

ConnectedStreams<TrackEvent, RulesEvent>  connectedStreams =
inputEventStream.connect(inputRulesStream).keyBy("imei","imei");
        DataStream<Tuple2&lt;TrackEvent, RulesEvent>> ds=
connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent,
Tuple2&lt;TrackEvent,RulesEvent>>() {
            Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent,
RulesEvent>();
            @Override
            public void flatMap1(TrackEvent trackEvent,
Collector<Tuple2&lt;TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f0=trackEvent;
                collector.collect(t2);
                // t2=new Tuple2<TrackEvent, RulesEvent>();
            }

            @Override
            public void flatMap2(RulesEvent rulesEvent,
Collector<Tuple2&lt;TrackEvent, RulesEvent>> collector) throws Exception {
                t2.f1 = rulesEvent;
                //collector.collect(t2);
            }
        });
        ds.printToErr();

Best,





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-keyby-issues-tp12999.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to