Hey Ufuk.
Did you maybe had a while to have a look at that problem?

2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:

> Hey Dawid! Thanks for reporting this. I will try to have a look over
> the course of the day. From a first impression, this seems like a bug
> to me.
>
> On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> <wysakowicz.da...@gmail.com> wrote:
> > Hi I was experimenting with the Query State feature and I have some
> problems
> > querying the state.
> >
> > The code which I use to produce the queryable state is:
> >
> >     env.addSource(kafkaConsumer).map(
> >       e => e match {
> >         case LoginClickEvent(_, t) => ("login", 1, t)
> >         case LogoutClickEvent(_, t) => ("logout", 1, t)
> >         case ButtonClickEvent(_, _, t) => ("button", 1, t)
> >       }).keyBy(0).timeWindow(Time.seconds(1))
> >       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
> >       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> >       .keyBy("key")
> >       .asQueryableState(
> >         "type-time-series-count",
> >         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> >           "type-time-series-count",
> >           classOf[KeyedDataPoint[java.lang.Integer]]))
> >
> > As you see it is a rather simple job, in which I try to count events of
> > different types in windows and then query by event type.
> >
> > In client code I do:
> >     // Query Flink state
> >     val future = client.getKvState(jobId, "type-time-series-count",
> > key.hashCode, seralizedKey)
> >
> >     // Await async result
> >     val serializedResult: Array[Byte] = Await.result(
> >       future, new FiniteDuration(
> >         10,
> >         duration.SECONDS))
> >
> >     // Deserialize response
> >     val results = deserializeResponse(serializedResult)
> >
> >     results
> >   }
> >
> >   private def deserializeResponse(serializedResult: Array[Byte]):
> > util.List[KeyedDataPoint[lang
> >   .Integer]] = {
> >     KvStateRequestSerializer.deserializeList(serializedResult,
> > getValueSerializer())
> >   }
> >
> > As I was trying to debug the issue I see the first element in list gets
> > deserialized correctly, but it fails on the second one. It seems like the
> > serialized result is broken. Do you have any idea if I am doing sth
> wrong or
> > there is some bug?
> >
> >
> > The exception I get is:
> > java.io.EOFException: null
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> DataInputDeserializer.java:157)
> > at
> > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> DataInputDeserializer.java:240)
> > at
> > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> PojoSerializer.java:386)
> > at
> > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> deserializeList(KvStateRequestSerializer.java:487)
> > at
> > com.dataartisans.stateserver.queryclient.QueryClient.
> deserializeResponse(QueryClient.scala:44)
> >
> > You can browse the exact code at: https://github.com/dawidwys/
> flink-intro
> >
> > I would be grateful for any advice.
> >
> > Regards
> > Dawid Wysakowicz
>

Reply via email to