Hi Aljoscha,

I removed business objects and logic etc.. I am happy to post here [😊]  I am 
sure this is a common issue when you start to seriously mess with state.


Assuming a type for the Output
And assuming that there is a function (EventA :=> String) in the mapWithState 
operator of typeAStream (implying the State is just a Seq[String] per key)

def coFun = new CoFlatMapFunction[EventA, EventB, Option[Output]] {

override def flatMap1(in: EventA, out: Collector[Option[Output]]) = 
out.collect(None)

override def flatMap2(in: EventB, out: Collector[Option[Output]]) = {

 new RichFlatMapFunction[EventB, Option[Output]] with StatefulFunction[EventB, 
Option[Output], Seq[String]] {

   lazy val stateTypeInfo: TypeInformation[Seq[String]] = 
implicitly[TypeInformation[Seq[String]]]
   lazy val serializer: TypeSerializer[Seq[String]] = 
stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
   override lazy val stateSerializer: TypeSerializer[Seq[String]] = serializer

   override def flatMap(in: EventB, out: Collector[Option[Output]]): Unit = {
     out.collect(
       applyWithState(
         in,
         (in, state) =>
           (state match {
             case None => None
             case Some(s) => Some(Output(...))
           }, state)
       )
     )
   }

   flatMap(in, out)

 }
}
}

applyWithState throws the exception and my intuition says I am doing seriously 
wrong in the instantiation. I tried to make something work using the 
mapWithState implementation as a guide and I ended up here.

Thanks,
Aris

________________________________
From: Aljoscha Krettek <aljos...@apache.org>
Sent: Tuesday, August 30, 2016 10:06 AM
To: user@flink.apache.org
Subject: Re: Accessing state in connected streams

Hi Aris,
I think you're on the right track with using a CoFlatMap for this. Could you 
maybe post the code of your CoFlatMapFunction (or you could send it to me 
privately if you have concerns with publicly posting it) then I could have a 
look.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 15:48 aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Any other opinion on this?


Thanks :)

Aris

From: aris kol <gizera...@hotmail.com<mailto:gizera...@hotmail.com>>
Sent: Sunday, August 28, 2016 12:04 AM

To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Accessing state in connected streams

In the implementation I am passing just one CoFlatMapFunction, where flatMap1, 
which operates on EventA, just emits a None (doesn't do anything practically) 
and flatMap2 tries to access the state and throws the NPE.

It wouldn't make sense to use a mapper in this context, I would still want to 
flatten afterwards before pushing dowstream.


Aris


________________________________
From: Sameer W <sam...@axiomine.com<mailto:sam...@axiomine.com>>
Sent: Saturday, August 27, 2016 11:40 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Accessing state in connected streams

Ok sorry about that :-). I misunderstood as I am not familiar with Scala code. 
Just curious though how are you passing two MapFunction's to the flatMap 
function on the connected stream. The interface of ConnectedStream requires 
just one CoMap function- 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.html

Sameer

On Sat, Aug 27, 2016 at 6:13 PM, aris kol 
<gizera...@hotmail.com<mailto:gizera...@hotmail.com>> wrote:

Let's say I have two types sharing the same trait

trait Event {
def id: Id
}

case class EventA(id: Id, info: InfoA) extends Event
case class EventB(id: Id, info: InfoB) extends Event

Each of these events gets pushed to a Kafka topic and gets consumed by a stream 
in Flink.

Let's say I have two streams

Events of type A create state:

val typeAStream = env.addSource(...)
.flatMap(someUnmarshallerForA)
.keyBy(_.id)
.mapWithState(...)

val typeBStream = env.addSource(...)
.flatMap(someUnmarshallerForB)
.keyBy(_.id)

I want now to process the events in typeBStream using the information stored in 
the State of typeAStream.

One approach would be to use the same stream for the two topics and then 
pattern match, but Event subclasses may grow in numbers and
may have different loads, thus I may want to keep things separate.

Would something along the lines of:

typeAStream.connect(typeBStream).
flatMap(
new IdentityFlatMapFunction(),
new SomeRichFlatMapFunctionForEventB[EventB, O] with StateFulFuntion[EventB, O, 
G[EventA]] { ... }
)

work?

I tried this approach and I ended up in a NPE because the state object was not 
initialized (meaning it was not there).


Thanks,
Aris


Reply via email to