Hi Dawid,
I'll try to reproduce the error in the next couple of days. Can you also share 
the value deserializer you use? Also, have you tried even smaller examples in 
the meantime? Did they work?

As a side-note in general regarding the queryable state "sink" using ListState 
(".asQueryableState(<name>, ListStateDescriptor)"): everything that enters 
this operator will be stored forever and never cleaned. Eventually, it will 
pile up too much memory and is thus of limited use. Maybe it should even be 
removed from the API.


Nico

On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> Hey Ufuk.
> Did you maybe had a while to have a look at that problem?
> 
> 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > the course of the day. From a first impression, this seems like a bug
> > to me.
> > 
> > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > 
> > <wysakowicz.da...@gmail.com> wrote:
> > > Hi I was experimenting with the Query State feature and I have some
> > 
> > problems
> > 
> > > querying the state.
> > > 
> > > The code which I use to produce the queryable state is:
> > >     env.addSource(kafkaConsumer).map(
> > >     
> > >       e => e match {
> > >       
> > >         case LoginClickEvent(_, t) => ("login", 1, t)
> > >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> > >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > >       
> > >       }).keyBy(0).timeWindow(Time.seconds(1))
> > >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > >       e2._3)))
> > >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> > >       .keyBy("key")
> > >       .asQueryableState(
> > >       
> > >         "type-time-series-count",
> > >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > >         
> > >           "type-time-series-count",
> > >           classOf[KeyedDataPoint[java.lang.Integer]]))
> > > 
> > > As you see it is a rather simple job, in which I try to count events of
> > > different types in windows and then query by event type.
> > > 
> > > In client code I do:
> > >     // Query Flink state
> > >     val future = client.getKvState(jobId, "type-time-series-count",
> > > 
> > > key.hashCode, seralizedKey)
> > > 
> > >     // Await async result
> > >     val serializedResult: Array[Byte] = Await.result(
> > >     
> > >       future, new FiniteDuration(
> > >       
> > >         10,
> > >         duration.SECONDS))
> > >     
> > >     // Deserialize response
> > >     val results = deserializeResponse(serializedResult)
> > >     
> > >     results
> > >   
> > >   }
> > > 
> > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > util.List[KeyedDataPoint[lang
> > > 
> > >   .Integer]] = {
> > >   
> > >     KvStateRequestSerializer.deserializeList(serializedResult,
> > > 
> > > getValueSerializer())
> > > 
> > >   }
> > > 
> > > As I was trying to debug the issue I see the first element in list gets
> > > deserialized correctly, but it fails on the second one. It seems like
> > > the
> > > serialized result is broken. Do you have any idea if I am doing sth
> > 
> > wrong or
> > 
> > > there is some bug?
> > > 
> > > 
> > > The exception I get is:
> > > java.io.EOFException: null
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > 
> > DataInputDeserializer.java:157)
> > 
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > 
> > DataInputDeserializer.java:240)
> > 
> > > at
> > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> > 
> > PojoSerializer.java:386)
> > 
> > > at
> > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> > 
> > deserializeList(KvStateRequestSerializer.java:487)
> > 
> > > at
> > > com.dataartisans.stateserver.queryclient.QueryClient.
> > 
> > deserializeResponse(QueryClient.scala:44)
> > 
> > > You can browse the exact code at: https://github.com/dawidwys/
> > 
> > flink-intro
> > 
> > > I would be grateful for any advice.
> > > 
> > > Regards
> > > Dawid Wysakowicz

Attachment: signature.asc
Description: This is a digitally signed message part.

Reply via email to