I think the issue is that t2 is not registered to keyed state, so it is
being shared across all of the keys on that taskmanager.  Take a look at
this article:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state

Basically you need to change t2 to be a
ValueState[Tuple2[TrackEvent,RulesEvent]]
and register it with a ValueStateDescriptor in in the function's open
method.  Then access it using t2.value() and t2.update().

Hopefully that helps.

On Thu, May 4, 2017 at 9:17 AM, Tarek khal <tarek.khal.leta...@gmail.com>
wrote:

> Hi ,
> I have two kafka topics (tracking and rules) and I would like to join
> "tracking" datastream with "rules" datastream as the data arrives in the
> "tracking" datastream.
>
> The problem with a join is that the rules only “survive” for the length of
> the window while I suspect that i want them to survive longer than that so
> that they can be applied to events arriving in the future.
>
> I tested ConnectedStream and CoFlatMapFunction but the result is not as I
> wait.
>
> *For the execution:*
>
> 1) I added 3 rules on "rules" topic (imei: "01","02,"03")
> 2) Perform 15 events with different imei but i guess i have problem with
> "keyby"
>
> *Result : *
>
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/n12999/2222222.jpg>
>
> *Code :*
>
> ConnectedStreams<TrackEvent, RulesEvent>  connectedStreams =
> inputEventStream.connect(inputRulesStream).keyBy("imei","imei");
>         DataStream<Tuple2&lt;TrackEvent, RulesEvent>> ds=
> connectedStreams.flatMap(new CoFlatMapFunction<TrackEvent, RulesEvent,
> Tuple2&lt;TrackEvent,RulesEvent>>() {
>             Tuple2<TrackEvent,RulesEvent> t2=new Tuple2<TrackEvent,
> RulesEvent>();
>             @Override
>             public void flatMap1(TrackEvent trackEvent,
> Collector<Tuple2&lt;TrackEvent, RulesEvent>> collector) throws Exception {
>                 t2.f0=trackEvent;
>                 collector.collect(t2);
>                 // t2=new Tuple2<TrackEvent, RulesEvent>();
>             }
>
>             @Override
>             public void flatMap2(RulesEvent rulesEvent,
> Collector<Tuple2&lt;TrackEvent, RulesEvent>> collector) throws Exception {
>                 t2.f1 = rulesEvent;
>                 //collector.collect(t2);
>             }
>         });
>         ds.printToErr();
>
> Best,
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/ConnectedStream-
> keyby-issues-tp12999.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>



-- 
*Jason Brelloch* | Product Developer
3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
<http://www.bettercloud.com/>
Subscribe to the BetterCloud Monitor
<https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
-
Get IT delivered to your inbox

Reply via email to