Hey Dawid! Thanks for reporting this. I will try to have a look over
the course of the day. From a first impression, this seems like a bug
to me.

On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
<wysakowicz.da...@gmail.com> wrote:
> Hi I was experimenting with the Query State feature and I have some problems
> querying the state.
>
> The code which I use to produce the queryable state is:
>
>     env.addSource(kafkaConsumer).map(
>       e => e match {
>         case LoginClickEvent(_, t) => ("login", 1, t)
>         case LogoutClickEvent(_, t) => ("logout", 1, t)
>         case ButtonClickEvent(_, _, t) => ("button", 1, t)
>       }).keyBy(0).timeWindow(Time.seconds(1))
>       .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, e2._3)))
>       .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
>       .keyBy("key")
>       .asQueryableState(
>         "type-time-series-count",
>         new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
>           "type-time-series-count",
>           classOf[KeyedDataPoint[java.lang.Integer]]))
>
> As you see it is a rather simple job, in which I try to count events of
> different types in windows and then query by event type.
>
> In client code I do:
>     // Query Flink state
>     val future = client.getKvState(jobId, "type-time-series-count",
> key.hashCode, seralizedKey)
>
>     // Await async result
>     val serializedResult: Array[Byte] = Await.result(
>       future, new FiniteDuration(
>         10,
>         duration.SECONDS))
>
>     // Deserialize response
>     val results = deserializeResponse(serializedResult)
>
>     results
>   }
>
>   private def deserializeResponse(serializedResult: Array[Byte]):
> util.List[KeyedDataPoint[lang
>   .Integer]] = {
>     KvStateRequestSerializer.deserializeList(serializedResult,
> getValueSerializer())
>   }
>
> As I was trying to debug the issue I see the first element in list gets
> deserialized correctly, but it fails on the second one. It seems like the
> serialized result is broken. Do you have any idea if I am doing sth wrong or
> there is some bug?
>
>
> The exception I get is:
> java.io.EOFException: null
> at
> org.apache.flink.runtime.util.DataInputDeserializer.readFully(DataInputDeserializer.java:157)
> at
> org.apache.flink.runtime.util.DataInputDeserializer.readUTF(DataInputDeserializer.java:240)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:386)
> at
> org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeList(KvStateRequestSerializer.java:487)
> at
> com.dataartisans.stateserver.queryclient.QueryClient.deserializeResponse(QueryClient.scala:44)
>
> You can browse the exact code at: https://github.com/dawidwys/flink-intro
>
> I would be grateful for any advice.
>
> Regards
> Dawid Wysakowicz

Reply via email to