Hey Ufuk. Did you maybe had a while to have a look at that problem? 2017-01-09 10:47 GMT+01:00 Ufuk Celebi <u...@apache.org>:
> Hey Dawid! Thanks for reporting this. I will try to have a look over > the course of the day. From a first impression, this seems like a bug > to me. > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz > <wysakowicz.da...@gmail.com> wrote: > > Hi I was experimenting with the Query State feature and I have some > problems > > querying the state. > > > > The code which I use to produce the queryable state is: > > > > env.addSource(kafkaConsumer).map( > > e => e match { > > case LoginClickEvent(_, t) => ("login", 1, t) > > case LogoutClickEvent(_, t) => ("logout", 1, t) > > case ButtonClickEvent(_, _, t) => ("button", 1, t) > > }).keyBy(0).timeWindow(Time.seconds(1)) > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3))) > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2)) > > .keyBy("key") > > .asQueryableState( > > "type-time-series-count", > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > > "type-time-series-count", > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > As you see it is a rather simple job, in which I try to count events of > > different types in windows and then query by event type. > > > > In client code I do: > > // Query Flink state > > val future = client.getKvState(jobId, "type-time-series-count", > > key.hashCode, seralizedKey) > > > > // Await async result > > val serializedResult: Array[Byte] = Await.result( > > future, new FiniteDuration( > > 10, > > duration.SECONDS)) > > > > // Deserialize response > > val results = deserializeResponse(serializedResult) > > > > results > > } > > > > private def deserializeResponse(serializedResult: Array[Byte]): > > util.List[KeyedDataPoint[lang > > .Integer]] = { > > KvStateRequestSerializer.deserializeList(serializedResult, > > getValueSerializer()) > > } > > > > As I was trying to debug the issue I see the first element in list gets > > deserialized correctly, but it fails on the second one. It seems like the > > serialized result is broken. Do you have any idea if I am doing sth > wrong or > > there is some bug? > > > > > > The exception I get is: > > java.io.EOFException: null > > at > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( > DataInputDeserializer.java:157) > > at > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( > DataInputDeserializer.java:240) > > at > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize( > PojoSerializer.java:386) > > at > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer. > deserializeList(KvStateRequestSerializer.java:487) > > at > > com.dataartisans.stateserver.queryclient.QueryClient. > deserializeResponse(QueryClient.scala:44) > > > > You can browse the exact code at: https://github.com/dawidwys/ > flink-intro > > > > I would be grateful for any advice. > > > > Regards > > Dawid Wysakowicz >