Thanks Jiayi, I updated the client code to use keyed stream key. The key is a Tuple2<UUID, String>
CompletableFuture<MapState<UUID, Rule>> resultFuture = client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), "rules", Tuple2.of(uuid, "test"), TypeInformation.of(new TypeHint<Tuple2<UUID, String>>() { }), descriptor); I'm now getting a different exception. I'm NOT using Avro as a customer serializer. Not sure what causes this issue. Caused by: java.lang.RuntimeException: Error while processing request with ID 21. Caused by: java.lang.UnsupportedOperationException: Could not find required Avro dependency. at org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass.read(Serializers.java:170) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) at org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) at org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) at org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.queryablestate.client.proxy.KvStateClientProxyHandler.lambda$executeActionAsync$0(KvStateClientProxyHandler.java:146) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Jayant Ameta On Tue, Nov 13, 2018 at 11:35 AM bupt_ljy <bupt_...@163.com> wrote: > Hi, Jayant > > The key you specified in getKvState function should be the key of the > keyed stream instead of the key of the map. From what I’ve seen on > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html, > this feature only supports managed keyed state. > > By the way, I think we should optimize the error messages with which > what Jayant met. > > Best, > Jiayi Liao > > Original Message > *Sender:* Jayant Ameta<wittyam...@gmail.com> > *Recipient:* trohrmann<trohrm...@apache.org> > *Cc:* bupt_ljy<bupt_...@163.com>; Tzu-Li (Gordon) Tai<tzuli...@apache.org>; > user<user@flink.apache.org> > *Date:* Tuesday, Nov 13, 2018 13:39 > *Subject:* Re: Queryable state when key is UUID - getting Kyro Exception > > Hi Till, > Here is the client snippet. Here Rule is a custom POJO that I use. > > public static void main(String[] args) > throws IOException, InterruptedException, ExecutionException { > UUID uuid = UUID.fromString("2ba14b80-e6ff-11e8-908b-9bd8fd37bffb"); > > QueryableStateClient client = new QueryableStateClient("127.0.1.1", 9069); > ExecutionConfig config = new ExecutionConfig(); > client.setExecutionConfig(config); > > MapStateDescriptor<UUID, Rule> descriptor = new > MapStateDescriptor<>("rulePatterns", UUID.class, > Rule.class); > CompletableFuture<MapState<UUID, Rule>> resultFuture = > > client.getKvState(JobID.fromHexString("337f4476388fabc6f29bb4fb9107082c"), > "rules", > uuid, TypeInformation.of(UUID.class), descriptor); > > while (!resultFuture.isDone()) { > Thread.sleep(1000); > } > resultFuture.whenComplete((result, throwable) -> { > if (throwable != null) { > throwable.printStackTrace(); > } else { > try { > System.out.println(result.get(uuid)); > } catch (Exception e) { > e.printStackTrace(); > } > } > }); > } > > > Below is the stack trace: > > Caused by: java.lang.RuntimeException: Error while processing request with > ID 12. Caused by: java.io.IOException: Unable to deserialize key and > namespace. This indicates a mismatch in the key/namespace serializers used > by the KvState instance and this access. > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:107) > at > org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.EOFException > at > org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:307) > at org.apache.flink.types.StringValue.readString(StringValue.java:770) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:69) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94) > ... 9 more > > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:98) > at > org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.lambda$run$0(AbstractServerHandler.java:266) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:778) > at > java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2140) > at > org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:748) > > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:324) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.queryablestate.network.Client$EstablishedConnection.onRequestFailure(Client.java:563) > at > org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:84) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at java.lang.Thread.run(Thread.java:748) > > Jayant Ameta > > > On Fri, Nov 9, 2018 at 5:14 PM Till Rohrmann <trohrm...@apache.org> wrote: > >> Could you send us a small example program which we can use to reproduce >> the problem? >> >> Cheers, >> Till >> >> On Fri, Nov 9, 2018 at 6:57 AM Jayant Ameta <wittyam...@gmail.com> wrote: >> >>> Yeah, it IS using Kryo serializer. >>> >>> Jayant Ameta >>> >>> >>> On Wed, Nov 7, 2018 at 9:57 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Jayant, could you check that the UUID key on the TM is actually >>>> serialized using a Kryo serializer? You can do this by setting a breakpoint >>>> in the constructor of the `AbstractKeyedStateBackend`. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Tue, Oct 30, 2018 at 9:44 AM bupt_ljy <bupt_...@163.com> wrote: >>>> >>>>> Hi, Jayant >>>>> >>>>> Your code looks good to me. And I’ve tried the >>>>> serialize/deserialize of Kryo on UUID class, it all looks okay. >>>>> >>>>> I’m not very sure about this problem. Maybe you can write a very >>>>> simple demo to try if it works. >>>>> >>>>> >>>>> Jiayi Liao, Best >>>>> >>>>> Original Message >>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>> user@flink.apache.org> >>>>> *Date:* Monday, Oct 29, 2018 11:53 >>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>> Exception >>>>> >>>>> Hi Jiayi, >>>>> Any further help on this? >>>>> >>>>> Jayant Ameta >>>>> >>>>> >>>>> On Fri, Oct 26, 2018 at 9:22 AM Jayant Ameta <wittyam...@gmail.com> >>>>> wrote: >>>>> >>>>>> MapStateDescriptor<UUID, String> descriptor = new >>>>>> MapStateDescriptor<>("rulePatterns", UUID.class, >>>>>> String.class); >>>>>> >>>>>> Jayant Ameta >>>>>> >>>>>> >>>>>> On Fri, Oct 26, 2018 at 8:19 AM bupt_ljy <bupt_...@163.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Can you show us the descriptor in the codes below? >>>>>>> >>>>>>> client.getKvState(JobID.fromHexString( >>>>>>> "c7b8af14b8afacf4fac16cdd0da7e997"), "rule", >>>>>>> >>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>> >>>>>>>> >>>>>>> Jiayi Liao, Best >>>>>>> >>>>>>> >>>>>>> Original Message >>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>> *Recipient:* bupt_ljy<bupt_...@163.com> >>>>>>> *Cc:* Tzu-Li (Gordon) Tai<tzuli...@apache.org>; user< >>>>>>> user@flink.apache.org> >>>>>>> *Date:* Friday, Oct 26, 2018 02:26 >>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>> Exception >>>>>>> >>>>>>> Also, I haven't provided any custom serializer in my flink job. >>>>>>> Shouldn't the same configuration work for queryable state client? >>>>>>> >>>>>>> Jayant Ameta >>>>>>> >>>>>>> >>>>>>> On Thu, Oct 25, 2018 at 4:15 PM Jayant Ameta <wittyam...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Gordon, >>>>>>>> Following is the stack trace that I'm getting: >>>>>>>> >>>>>>>> *Exception in thread "main" >>>>>>>> java.util.concurrent.ExecutionException: java.lang.RuntimeException: >>>>>>>> Failed >>>>>>>> request 0.* >>>>>>>> * Caused by: java.lang.RuntimeException: Failed request 0.* >>>>>>>> * Caused by: java.lang.RuntimeException: Error while processing >>>>>>>> request with ID 0. Caused by: com.esotericsoftware.kryo.KryoException: >>>>>>>> Encountered unregistered class ID: -985346241* >>>>>>>> *Serialization trace:* >>>>>>>> *$outer (scala.collection.convert.Wrappers$SeqWrapper)* >>>>>>>> * at >>>>>>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)* >>>>>>>> * at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)* >>>>>>>> * at >>>>>>>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)* >>>>>>>> * at >>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)* >>>>>>>> * at >>>>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)* >>>>>>>> * at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:249)* >>>>>>>> * at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)* >>>>>>>> * at >>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)* >>>>>>>> * at >>>>>>>> org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer.deserializeKeyAndNamespace(KvStateSerializer.java:94)* >>>>>>>> * at >>>>>>>> org.apache.flink.runtime.state.heap.AbstractHeapState.getSerializedValue(AbstractHeapState.java:93)* >>>>>>>> * at >>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:87)* >>>>>>>> * at >>>>>>>> org.apache.flink.queryablestate.server.KvStateServerHandler.handleRequest(KvStateServerHandler.java:49)* >>>>>>>> * at >>>>>>>> org.apache.flink.queryablestate.network.AbstractServerHandler$AsyncRequestTask.run(AbstractServerHandler.java:229)* >>>>>>>> * at >>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)* >>>>>>>> * at java.util.concurrent.FutureTask.run(FutureTask.java:266)* >>>>>>>> * at >>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)* >>>>>>>> * at >>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)* >>>>>>>> * at java.lang.Thread.run(Thread.java:748)* >>>>>>>> >>>>>>>> I am not using any custom serialize as mentioned by Jiayi. >>>>>>>> >>>>>>>> Jayant Ameta >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Oct 25, 2018 at 3:01 PM bupt_ljy <bupt_...@163.com> wrote: >>>>>>>> >>>>>>>>> Hi Jayant, >>>>>>>>> >>>>>>>>> There should be a Serializer parameter in the constructor of the >>>>>>>>> StateDescriptor, you should create a new serializer like this: >>>>>>>>> >>>>>>>>> >>>>>>>>> new GenericTypeInfo(classOf[UUID]).createSerializer(env >>>>>>>>> .getConfig) >>>>>>>>> >>>>>>>>> >>>>>>>>> By the way, can you show us your kryo exception like what Gordon >>>>>>>>> said? >>>>>>>>> >>>>>>>>> >>>>>>>>> Jiayi Liao, Best >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Original Message >>>>>>>>> *Sender:* Tzu-Li (Gordon) Tai<tzuli...@apache.org> >>>>>>>>> *Recipient:* Jayant Ameta<wittyam...@gmail.com>; bupt_ljy< >>>>>>>>> bupt_...@163.com> >>>>>>>>> *Cc:* user<user@flink.apache.org> >>>>>>>>> *Date:* Thursday, Oct 25, 2018 17:18 >>>>>>>>> *Subject:* Re: Queryable state when key is UUID - getting Kyro >>>>>>>>> Exception >>>>>>>>> >>>>>>>>> Hi Jayant, >>>>>>>>> >>>>>>>>> What is the Kryo exception message that you are getting? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Gordon >>>>>>>>> >>>>>>>>> >>>>>>>>> On 25 October 2018 at 5:17:13 PM, Jayant Ameta ( >>>>>>>>> wittyam...@gmail.com) wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I've not configured any serializer in the descriptor. (Neither in >>>>>>>>> flink job, nor in state query client). >>>>>>>>> Which serializer should I use? >>>>>>>>> >>>>>>>>> Jayant Ameta >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Oct 25, 2018 at 2:13 PM bupt_ljy <bupt_...@163.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> It seems that your codes are right. Are you sure that you’re >>>>>>>>>> using the same Serializer as the Flink program do? Could you show the >>>>>>>>>> serializer in descriptor? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Jiayi Liao, Best >>>>>>>>>> >>>>>>>>>> Original Message >>>>>>>>>> *Sender:* Jayant Ameta<wittyam...@gmail.com> >>>>>>>>>> *Recipient:* user<user@flink.apache.org> >>>>>>>>>> *Date:* Thursday, Oct 25, 2018 14:17 >>>>>>>>>> *Subject:* Queryable state when key is UUID - getting Kyro >>>>>>>>>> Exception >>>>>>>>>> >>>>>>>>>> I get Kyro exception when querying the state. >>>>>>>>>> >>>>>>>>>> Key: UUID >>>>>>>>>> MapState<UUID, String> >>>>>>>>>> >>>>>>>>>> Client code snippet: >>>>>>>>>> >>>>>>>>>> CompletableFuture<MapState<UUID, String>> resultFuture = >>>>>>>>>> >>>>>>>>>> client.getKvState(JobID.fromHexString("c7b8af14b8afacf4fac16cdd0da7e997"), >>>>>>>>>> "rule", >>>>>>>>>> UUID.fromString("3b3f17a0-d81a-11e8-bb91-7fd1412de84d"), >>>>>>>>>> TypeInformation.of(new TypeHint<UUID>() {}), descriptor); >>>>>>>>>> MapState<UUID, String> mapState = resultFuture.get(10, >>>>>>>>>> TimeUnit.SECONDS); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Any better way to query it? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Jayant Ameta >>>>>>>>>> >>>>>>>>>