Hey Dawid! Thanks for reporting this. I will try to have a look over the course of the day. From a first impression, this seems like a bug to me.
On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz <wysakowicz.da...@gmail.com> wrote: > Hi I was experimenting with the Query State feature and I have some problems > querying the state. > > The code which I use to produce the queryable state is: > > env.addSource(kafkaConsumer).map( > e => e match { > case LoginClickEvent(_, t) => ("login", 1, t) > case LogoutClickEvent(_, t) => ("logout", 1, t) > case ButtonClickEvent(_, _, t) => ("button", 1, t) > }).keyBy(0).timeWindow(Time.seconds(1)) > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3))) > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2)) > .keyBy("key") > .asQueryableState( > "type-time-series-count", > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > "type-time-series-count", > classOf[KeyedDataPoint[java.lang.Integer]])) > > As you see it is a rather simple job, in which I try to count events of > different types in windows and then query by event type. > > In client code I do: > // Query Flink state > val future = client.getKvState(jobId, "type-time-series-count", > key.hashCode, seralizedKey) > > // Await async result > val serializedResult: Array[Byte] = Await.result( > future, new FiniteDuration( > 10, > duration.SECONDS)) > > // Deserialize response > val results = deserializeResponse(serializedResult) > > results > } > > private def deserializeResponse(serializedResult: Array[Byte]): > util.List[KeyedDataPoint[lang > .Integer]] = { > KvStateRequestSerializer.deserializeList(serializedResult, > getValueSerializer()) > } > > As I was trying to debug the issue I see the first element in list gets > deserialized correctly, but it fails on the second one. It seems like the > serialized result is broken. Do you have any idea if I am doing sth wrong or > there is some bug? > > > The exception I get is: > java.io.EOFException: null > at > org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157) > at > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386) > at > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487) > at > com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44) > > You can browse the exact code at: https://github.com/dawidwys/flink-intro > > I would be grateful for any advice. > > Regards > Dawid Wysakowicz