Unable to query MapState
Hi, I am trying to query Flink's MapState from Flink client (1.3.2). I was able to query ValueState but when I tried to query MapState I am getting an exception. java.io.IOException: Unconsumed bytes in the deserialized value. This indicates a mismatch in the value serializers used by the KvState instance and this access. at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438) at com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81) at com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) *Flink Job's Logic* * FlinkKafkaConsumer09 consumer = new FlinkKafkaConsumer09<>(* *"/apps/application-stream:flink-demo", new MerchantApiSchema(), properties);* *DataStream inputEventStream = env.addSource(consumer);* *DataStream> outputStream =* *inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)* *.window(SlidingProcessingTimeWindows.of(Time.seconds(120), Time.milliseconds(1000)))* *.sum(2);* *DataStream output = outputStream.keyBy(0).flatMap(new CountEvent());* *output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);* *// execute program* *env.execute("Filter Transformation Example");* * }* * public static class CreateTuple* * implements MapFunction > {* *@Override* *public Tuple3 map(MerchantApiEvent input) throws Exception {* * return new Tuple3 (input.getMerchantId(), input.getApiName(), 1L);* *}* * }* * public static class CountEvent extends RichFlatMapFunction , Long> {* *private transient MapState mapState;* *@Override* *public void flatMap(Tuple3 input, Collector out) throws Exception {* * mapState.put(input.f1, input.f2);* *}* *@Override* *public void open(Configuration config) {* * MapStateDescriptor mapStateDesc = new MapStateDescriptor (* * "mapQuery", TypeInformation.of(new TypeHint() {* * }), TypeInformation.of(new TypeHint() {* * }));* * mapStateDesc.setQueryable("mapQuery");* * mapState = getRuntimeContext().getMapState(mapStateDesc);* *}* * }* *Flink Query Client's Logic* *final JobID jobId = JobID.fromHexString(jobIdParam);* *String key = queryStateRequestDto.getKey();* *final Configuration config = new Configuration();* *config.setString(JobManagerOptions.ADDRESS, jobManagerHost);* *config.setInteger(JobManagerOptions.PORT, jobManagerPort);* *HighAvailabilityServices highAvailabilityServices = null;* *try {* * highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(* * config, Executors.newSingleThreadScheduledExecutor(),* * HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);* *} catch (Exception e) {* * // TODO Auto-generated catch block* * e.printStackTrace();* *}* *try {* * QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);* * final TypeSerializer keySerializer = TypeInformation.of(new TypeHint() {* * }).createSerializer(new ExecutionConfig());* * final TypeSerializer
Queryable State - Count within Time Window
Hi, I want to find the number of events happened so far in last 5 minutes and make that as a Queryable state. Is it possible? It will be of great help if someone provide some sample code for the same. Thanks, Velu.
Re: Queryable State Client - Actor Not found Exception
Thank you Aljoscha. I am able to Query state when I use the hostname of Job Manager instead of its IP Address. But I couldn't understand why it is not working if I give IP address. On Thu, Jan 4, 2018 at 6:42 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > Is my-machine:52650 the correct address for the JobManager running in > YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when > you use YARN with HA mode. > > Best, > Aljoscha > > On 3. Jan 2018, at 16:02, Velu Mitwa <velumani.mit2...@gmail.com> wrote: > > Hi, > I am running a Flink Job which uses the Queryable State feature of Apache > Flink(1.3.2). I was able to do that in local mode. When I try to do that in > Cluster mode (Yarn Session), I am getting Actor not found Exception. > > Please help me to understand what is missing. > > *Exception Trace* > > > Query failed because of the following Exception: > akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka. > tcp://flink@my-machine:52650/), Path(/user/jobmanager)] > at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS > election.scala:65) > at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS > election.scala:63) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) > at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(Ba > tchingExecutor.scala:55) > at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor. > scala:73) > at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. > unbatchedExecute(Future.scala:74) > at akka.dispatch.BatchingExecutor$class.execute( > BatchingExecutor.scala:120) > at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. > execute(Future.scala:73) > at scala.concurrent.impl.CallbackRunnable.executeWithValue( > Promise.scala:40) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro > mise.scala:248) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp > ort.scala:334) > at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) > at scala.concurrent.Future$InternalCallbackExecutor$.scala$ > concurrent$Future$InternalCallbackExecutor$$unbatchedExecute > (Future.scala:694) > at scala.concurrent.Future$InternalCallbackExecutor$.execute( > Future.scala:691) > at akka.actor.LightArrayRevolverScheduler$TaskHolder. > executeTask(Scheduler.scala:474) > at akka.actor.LightArrayRevolverScheduler$$anon$8. > executeBucket$1(Scheduler.scala:425) > at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick( > Scheduler.scala:429) > at akka.actor.LightArrayRevolverScheduler$$anon$8.run( > Scheduler.scala:381) > at java.lang.Thread.run(Thread.java:745) > > *Client Creation Snippet * > > * Configuration config = new Configuration();* > *config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, > jobManagerHost);* > *config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, > jobManagerPort);* > > *final HighAvailabilityServices highAvailabilityServices = > HighAvailabilityServicesUtils* > *.createHighAvailabilityServices(config, > Executors.newSingleThreadScheduledExecutor(),* > * > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);* > > *this.client = new QueryableStateClient(config, > highAvailabilityServices);* > * }* > > > >
Fwd: Queryable State Client - Actor Not found Exception
Hi, I am running a Flink Job which uses the Queryable State feature of Apache Flink(1.3.2). I was able to do that in local mode. When I try to do that in Cluster mode (Yarn Session), I am getting Actor not found Exception. Please help me to understand what is missing. *Exception Trace* Query failed because of the following Exception: akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka. tcp://flink@my-machine:52650/), Path(/user/jobmanager)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply( ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply( ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch( BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$Batch.run( BatchingExecutor.scala:73) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor. scala:120) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$. execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise. scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete( Promise.scala:248) at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp( AskSupport.scala:334) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$. scala$concurrent$Future$InternalCallbackExecutor$$ unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$. execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask( Scheduler.scala:474) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1( Scheduler.scala:425) at akka.actor.LightArrayRevolverScheduler$$ anon$8.nextTick(Scheduler.scala:429) at akka.actor.LightArrayRevolverScheduler$$ anon$8.run(Scheduler.scala:381) at java.lang.Thread.run(Thread.java:745) *Client Creation Snippet * * Configuration config = new Configuration();* *config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);* *config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);* *final HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils* *.createHighAvailabilityServices(config, Executors.newSingleThreadScheduledExecutor(),* * HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);* *this.client = new QueryableStateClient(config, highAvailabilityServices);* * }*