Hi Velu,

I would recommend to switch to Flink 1.4 as the queryable state has been 
refactored to be compatible with all types of state.
You can read more here: 
In addition, a lot of things have been simplified.

And for an example you can use this link: 

which is directly from the Queryable State IT cases.


> On Jan 22, 2018, at 2:38 PM, Velu Mitwa <velumani.mit2...@gmail.com> wrote:
> Hi,
> I am trying to query Flink's MapState from Flink client (1.3.2). I was able 
> to query ValueState but when I tried to query MapState I am getting an 
> exception. 
> java.io.IOException: Unconsumed bytes in the deserialized value. This 
> indicates a mismatch in the value serializers used by the KvState instance 
> and this access.
>         at 
> org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
>         at 
> com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
>         at 
> com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
>         at 
> org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
>         at 
> org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
>         at 
> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
>         at 
> org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
> Flink Job's Logic
>     FlinkKafkaConsumer09<MerchantApiEvent> consumer = new 
> FlinkKafkaConsumer09<>(
>         "/apps/application-stream:flink-demo", new MerchantApiSchema(), 
> properties);
>     DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer);
>     DataStream<Tuple3<String, String, Long>> outputStream =
>         inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)
>             .window(SlidingProcessingTimeWindows.of(Time.seconds(120), 
> Time.milliseconds(1000)))
>             .sum(2);
>     DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent());
>     output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);
>     // execute program
>     env.execute("Filter Transformation Example");
>   }
>   public static class CreateTuple
>       implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> {
>     @Override
>     public Tuple3<String, String, Long> map(MerchantApiEvent input) throws 
> Exception {
>       return new Tuple3<String, String, Long>(input.getMerchantId(), 
> input.getApiName(), 1L);
>     }
>   }
>   public static class CountEvent extends RichFlatMapFunction<Tuple3<String, 
> String, Long>, Long> {
>     private transient MapState<String, Long> mapState;
>     @Override
>     public void flatMap(Tuple3<String, String, Long> input, Collector<Long> 
> out) throws Exception {
>       mapState.put(input.f1, input.f2);
>     }
>     @Override
>     public void open(Configuration config) {
>       MapStateDescriptor<String, Long> mapStateDesc = new 
> MapStateDescriptor<String, Long>(
>           "mapQuery", TypeInformation.of(new TypeHint<String>() {
>           }), TypeInformation.of(new TypeHint<Long>() {
>           }));
>       mapStateDesc.setQueryable("mapQuery");
>       mapState = getRuntimeContext().getMapState(mapStateDesc);
>     }
>   }
> Flink Query Client's Logic
>     final JobID jobId = JobID.fromHexString(jobIdParam);
>     String key = queryStateRequestDto.getKey();
>     final Configuration config = new Configuration();
>     config.setString(JobManagerOptions.ADDRESS, jobManagerHost);
>     config.setInteger(JobManagerOptions.PORT, jobManagerPort);
>     HighAvailabilityServices highAvailabilityServices = null;
>     try {
>       highAvailabilityServices = 
> HighAvailabilityServicesUtils.createHighAvailabilityServices(
>           config, Executors.newSingleThreadScheduledExecutor(),
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>     } catch (Exception e) {
>       // TODO Auto-generated catch block
>       e.printStackTrace();
>     }
>     try {
>       QueryableStateClient client = new QueryableStateClient(config, 
> highAvailabilityServices);
>       final TypeSerializer<String> keySerializer = TypeInformation.of(new 
> TypeHint<String>() {
>       }).createSerializer(new ExecutionConfig());
>       final TypeSerializer<Map<String, Long>> valueSerializer =
>           TypeInformation.of(new TypeHint<Map<String, Long>>() {
>           }).createSerializer(new ExecutionConfig());
>       final byte[] serializedKey = 
> KvStateRequestSerializer.serializeKeyAndNamespace(key,
>           keySerializer, VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE);
>       scala.concurrent.Future<byte[]> serializedResult =
>           client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey);
>       // now wait for the result and return it
>       final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
>       byte[] serializedValue = Await.result(serializedResult, duration);
>       Map<String, Long> value =
>           KvStateRequestSerializer.deserializeValue(serializedValue, 
> valueSerializer);
>       System.out.println(value);
>     } catch (Exception e) {
>       // TODO Auto-generated catch block
>       e.printStackTrace();
>     }

