Hi Velu, I would recommend to switch to Flink 1.4 as the queryable state has been refactored to be compatible with all types of state. You can read more here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html> In addition, a lot of things have been simplified.
And for an example you can use this link: https://github.com/apache/flink/blob/a3fd548e9c76c67609bbf159d5fb743d756450b1/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java#L804 which is directly from the Queryable State IT cases. Thanks, Kostas > On Jan 22, 2018, at 2:38 PM, Velu Mitwa <velumani.mit2...@gmail.com> wrote: > > Hi, > I am trying to query Flink's MapState from Flink client (1.3.2). I was able > to query ValueState but when I tried to query MapState I am getting an > exception. > > java.io.IOException: Unconsumed bytes in the deserialized value. This > indicates a mismatch in the value serializers used by the KvState instance > and this access. > at > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438) > at > com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81) > at > com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) > at > org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133) > at > org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97) > at > org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) > at > org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) > > Flink Job's Logic > > FlinkKafkaConsumer09<MerchantApiEvent> consumer = new > FlinkKafkaConsumer09<>( > "/apps/application-stream:flink-demo", new MerchantApiSchema(), > properties); > > DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer); > > DataStream<Tuple3<String, String, Long>> outputStream = > inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1) > .window(SlidingProcessingTimeWindows.of(Time.seconds(120), > Time.milliseconds(1000))) > .sum(2); > > DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent()); > > output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE); > > // execute program > env.execute("Filter Transformation Example"); > > } > > > public static class CreateTuple > implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> { > @Override > public Tuple3<String, String, Long> map(MerchantApiEvent input) throws > Exception { > return new Tuple3<String, String, Long>(input.getMerchantId(), > input.getApiName(), 1L); > } > > } > > public static class CountEvent extends RichFlatMapFunction<Tuple3<String, > String, Long>, Long> { > > private transient MapState<String, Long> mapState; > > @Override > public void flatMap(Tuple3<String, String, Long> input, Collector<Long> > out) throws Exception { > > mapState.put(input.f1, input.f2); > > } > > @Override > public void open(Configuration config) { > > MapStateDescriptor<String, Long> mapStateDesc = new > MapStateDescriptor<String, Long>( > "mapQuery", TypeInformation.of(new TypeHint<String>() { > }), TypeInformation.of(new TypeHint<Long>() { > })); > mapStateDesc.setQueryable("mapQuery"); > > mapState = getRuntimeContext().getMapState(mapStateDesc); > > } > } > > > Flink Query Client's Logic > > final JobID jobId = JobID.fromHexString(jobIdParam); > > String key = queryStateRequestDto.getKey(); > > final Configuration config = new Configuration(); > config.setString(JobManagerOptions.ADDRESS, jobManagerHost); > config.setInteger(JobManagerOptions.PORT, jobManagerPort); > > HighAvailabilityServices highAvailabilityServices = null; > try { > highAvailabilityServices = > HighAvailabilityServicesUtils.createHighAvailabilityServices( > config, Executors.newSingleThreadScheduledExecutor(), > > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > > try { > QueryableStateClient client = new QueryableStateClient(config, > highAvailabilityServices); > > final TypeSerializer<String> keySerializer = TypeInformation.of(new > TypeHint<String>() { > }).createSerializer(new ExecutionConfig()); > final TypeSerializer<Map<String, Long>> valueSerializer = > TypeInformation.of(new TypeHint<Map<String, Long>>() { > }).createSerializer(new ExecutionConfig()); > > final byte[] serializedKey = > KvStateRequestSerializer.serializeKeyAndNamespace(key, > keySerializer, VoidNamespace.INSTANCE, > VoidNamespaceSerializer.INSTANCE); > > scala.concurrent.Future<byte[]> serializedResult = > client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey); > > // now wait for the result and return it > final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS); > byte[] serializedValue = Await.result(serializedResult, duration); > Map<String, Long> value = > KvStateRequestSerializer.deserializeValue(serializedValue, > valueSerializer); > System.out.println(value); > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > }