Ok sorry about that :-). I misunderstood as I am not familiar with Scala
code. Just curious though how are you passing two MapFunction's to the
flatMap function on the connected stream. The interface of ConnectedStream
requires just one CoMap function-
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol <gizera...@hotmail.com> wrote:

> Let's say I have two types sharing the same trait
>
> trait Event {
> def id: Id
> }
>
> case class EventA(id: Id, info: InfoA) extends Event
> case class EventB(id: Id, info: InfoB) extends Event
>
> Each of these events gets pushed to a Kafka topic and gets consumed by a
> stream in Flink.
>
> Let's say I have two streams
>
> Events of type A create state:
>
> val typeAStream = env.addSource(...)
> .flatMap(someUnmarshallerForA)
> .keyBy(_.id)
> .mapWithState(...)
>
> val typeBStream = env.addSource(...)
> .flatMap(someUnmarshallerForB)
> .keyBy(_.id)
>
> I want now to process the events in typeBStream using the information
> stored in the State of typeAStream.
>
> One approach would be to use the same stream for the two topics and then
> pattern match, but Event subclasses may grow in numbers and
> may have different loads, thus I may want to keep things separate.
>
> Would something along the lines of:
>
> typeAStream.connect(typeBStream).
> flatMap(
> new IdentityFlatMapFunction(),
> new SomeRichFlatMapFunctionForEventB[EventB, O] with
> StateFulFuntion[EventB, O, G[EventA]] { ... }
> )
>
> work?
>
> I tried this approach and I ended up in a NPE because the state object was
> not initialized (meaning it was not there).
>
>
> Thanks,
> Aris
>
>

Reply via email to