Unable to query MapState

2018-01-22 Thread Velu Mitwa
Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able
to query ValueState but when I tried to query MapState I am getting an
exception.

java.io.IOException: Unconsumed bytes in the deserialized value. This
indicates a mismatch in the value serializers used by the KvState instance
and this access.
at
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
at
com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
at
com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at
org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)

*Flink Job's Logic*

   * FlinkKafkaConsumer09 consumer = new
FlinkKafkaConsumer09<>(*
*"/apps/application-stream:flink-demo", new MerchantApiSchema(),
properties);*

*DataStream inputEventStream =
env.addSource(consumer);*

*DataStream> outputStream =*
*inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)*
*.window(SlidingProcessingTimeWindows.of(Time.seconds(120),
Time.milliseconds(1000)))*
*.sum(2);*

*DataStream output = outputStream.keyBy(0).flatMap(new
CountEvent());*

*output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);*

*// execute program*
*env.execute("Filter Transformation Example");*

*  }*


*  public static class CreateTuple*
*  implements MapFunction> {*
*@Override*
*public Tuple3 map(MerchantApiEvent input) throws
Exception {*
*  return new Tuple3(input.getMerchantId(),
input.getApiName(), 1L);*
*}*

*  }*

*  public static class CountEvent extends
RichFlatMapFunction, Long> {*

*private transient MapState mapState;*

*@Override*
*public void flatMap(Tuple3 input,
Collector out) throws Exception {*

*  mapState.put(input.f1, input.f2);*

*}*

*@Override*
*public void open(Configuration config) {*

*  MapStateDescriptor mapStateDesc = new
MapStateDescriptor(*
*  "mapQuery", TypeInformation.of(new TypeHint() {*
*  }), TypeInformation.of(new TypeHint() {*
*  }));*
*  mapStateDesc.setQueryable("mapQuery");*

*  mapState = getRuntimeContext().getMapState(mapStateDesc);*

*}*
*  }*


*Flink Query Client's Logic*

*final JobID jobId = JobID.fromHexString(jobIdParam);*

*String key = queryStateRequestDto.getKey();*

*final Configuration config = new Configuration();*
*config.setString(JobManagerOptions.ADDRESS, jobManagerHost);*
*config.setInteger(JobManagerOptions.PORT, jobManagerPort);*

*HighAvailabilityServices highAvailabilityServices = null;*
*try {*
*  highAvailabilityServices =
HighAvailabilityServicesUtils.createHighAvailabilityServices(*
*  config, Executors.newSingleThreadScheduledExecutor(),*
*
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
*} catch (Exception e) {*
*  // TODO Auto-generated catch block*
*  e.printStackTrace();*
*}*

*try {*
*  QueryableStateClient client = new QueryableStateClient(config,
highAvailabilityServices);*

*  final TypeSerializer keySerializer = TypeInformation.of(new
TypeHint() {*
*  }).createSerializer(new ExecutionConfig());*
*  final TypeSerializer> valueSerializer =*
*  TypeInformation.of(new TypeHint>() {*
*  }).createSerializer(new ExecutionConfig());*

*  final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(key,*
*  keySerializer, VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);*

*  scala.concurrent.Future serializedResult =*
*  client.getKvState(jobId, "mapQuery", 

Queryable State - Count within Time Window

2018-01-08 Thread Velu Mitwa
Hi,
I want to find the number of events happened so far in last 5 minutes and
make that as a Queryable state. Is it possible? It will be of great help if
someone provide some sample code for the same.

Thanks,
Velu.


Re: Queryable State Client - Actor Not found Exception

2018-01-04 Thread Velu Mitwa
Thank you Aljoscha.

I am able to Query state when I use the hostname of Job Manager instead of
its IP Address. But I couldn't understand why it is not working if I give
IP address.

On Thu, Jan 4, 2018 at 6:42 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> Is my-machine:52650 the correct address for the JobManager running in
> YARN? Also, Queryable State in Flink 1.3.2 is not easy to get to work when
> you use YARN with HA mode.
>
> Best,
> Aljoscha
>
> On 3. Jan 2018, at 16:02, Velu Mitwa <velumani.mit2...@gmail.com> wrote:
>
> Hi,
> I am running a Flink Job which uses the Queryable State feature of Apache
> Flink(1.3.2). I was able to do that in local mode. When I try to do that in
> Cluster mode (Yarn Session), I am getting Actor not found Exception.
>
> Please help me to understand what is missing.
>
> *Exception Trace*
>
>
> Query failed because of the following Exception:
> akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.
> tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:65)
> at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorS
> election.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(Ba
> tchingExecutor.scala:55)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.
> scala:73)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> unbatchedExecute(Future.scala:74)
> at akka.dispatch.BatchingExecutor$class.execute(
> BatchingExecutor.scala:120)
> at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
> execute(Future.scala:73)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(
> Promise.scala:40)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Pro
> mise.scala:248)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupp
> ort.scala:334)
> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
> at scala.concurrent.Future$InternalCallbackExecutor$.scala$
> concurrent$Future$InternalCallbackExecutor$$unbatchedExecute
> (Future.scala:694)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(
> Future.scala:691)
> at akka.actor.LightArrayRevolverScheduler$TaskHolder.
> executeTask(Scheduler.scala:474)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.
> executeBucket$1(Scheduler.scala:425)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(
> Scheduler.scala:429)
> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(
> Scheduler.scala:381)
> at java.lang.Thread.run(Thread.java:745)
>
> *Client Creation Snippet *
>
> * Configuration config = new Configuration();*
> *config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> jobManagerHost);*
> *config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> jobManagerPort);*
>
> *final HighAvailabilityServices highAvailabilityServices =
> HighAvailabilityServicesUtils*
> *.createHighAvailabilityServices(config,
> Executors.newSingleThreadScheduledExecutor(),*
> *
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*
>
> *this.client = new QueryableStateClient(config,
> highAvailabilityServices);*
> *  }*
>
>
>
>


Fwd: Queryable State Client - Actor Not found Exception

2018-01-03 Thread Velu Mitwa
Hi,
I am running a Flink Job which uses the Queryable State feature of Apache
Flink(1.3.2). I was able to do that in local mode. When I try to do that in
Cluster mode (Yarn Session), I am getting Actor not found Exception.

Please help me to understand what is missing.

*Exception Trace*


Query failed because of the following Exception:
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.
tcp://flink@my-machine:52650/), Path(/user/jobmanager)]
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(
ActorSelection.scala:65)
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(
ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(
BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$Batch.run(
BatchingExecutor.scala:73)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.
scala:120)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.
execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.
scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(
Promise.scala:248)
at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(
AskSupport.scala:334)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at scala.concurrent.Future$InternalCallbackExecutor$.
scala$concurrent$Future$InternalCallbackExecutor$$
unbatchedExecute(Future.scala:694)
at scala.concurrent.Future$InternalCallbackExecutor$.
execute(Future.scala:691)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(
Scheduler.scala:474)
at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(
Scheduler.scala:425)
at akka.actor.LightArrayRevolverScheduler$$
anon$8.nextTick(Scheduler.scala:429)
at akka.actor.LightArrayRevolverScheduler$$
anon$8.run(Scheduler.scala:381)
at java.lang.Thread.run(Thread.java:745)

*Client Creation Snippet *

* Configuration config = new Configuration();*
*config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
jobManagerHost);*
*config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
jobManagerPort);*

*final HighAvailabilityServices highAvailabilityServices =
HighAvailabilityServicesUtils*
*.createHighAvailabilityServices(config,
Executors.newSingleThreadScheduledExecutor(),*
*
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);*

*this.client = new QueryableStateClient(config,
highAvailabilityServices);*
*  }*