Hi Sameer,

Thank you for your quick response.


I don't think event ordering is the problem here, the processor doesn't assume 
any ordering.

KeyedStream[EventA] stores a state of type Set[InfoA] on its key, which I would 
like KeyedStream[EventB] to access.

The code operates on an Option[Set[InfoA]] without inviting trouble by invoking 
.get.

applyWithState throws the exception because the private ValueState[S] is never 
initialised.

Since KeyedStream[EventA] successfully updates the state, it can could be that:

- There is some wrong config in SomeRichFlatMapFunctionForEventB, which is fine 
and can be fixed

- I am doing something completely wrong that Flink doesn't support.


Thanks,

Aris


________________________________
From: Sameer W <sam...@axiomine.com>
Sent: Saturday, August 27, 2016 10:17 PM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

There is no guarantee about the order in which each stream elements arrive in a 
connected streams. You have to check if the elements have arrived from Stream A 
before using the information to process elements from Stream B. Otherwise you 
have to buffer elements from stream B and check if there are unprocessed 
elements from stream B when elements arrive from stream A. You might need to do 
that for elements from both streams depending on how you use them.

You will get  NPE if you assume events have arrived from A and but they might 
be lagging behind.

On Sat, Aug 27, 2016 at 6:13 PM, aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris


Reply via email to