Re: custom metrics in elasticsearch ActionRequestFailureHandler

2021-12-03 Thread Lars Bachmann
Hi Alexander,

yes in the first iteration the use case is to get visibility on failed ES 
requests. Usually we expose metrics to count failures and integrate them into 
dashboards and setup alerting rules which fire in case they hit a certain 
threshold.

In not Flink based applications which index data into ES we also applied 
failure handlers which evaluated the kind of error and triggered different 
actions. For example there are errors from which you can recover 
(ConnectException because of some network issues) - in this case we retried the 
indexing request or eventually even forced a stop of the client until the 
failure was resolved. On the other hand there can be errors from which you can 
not recover. In our scenario this was the case for indexing requests with 
malformed data (mapping failures) - here we just ignored the failure and 
dropped the messages.

I’m not sure if we want to move this logic as well to the Flink applications, 
but I just wanted to mention that there can be situations where you might want 
to control the behavior of your application depending on the failure.

Best Regards,

Lars


> Am 03.12.2021 um 09:05 schrieb Alexander Preuß 
> :
> 
> Hi Lars,
> 
> What is your use case for the failure handler, just collecting metrics? We 
> want to remove the configurable failure handler in the new Sink API 
> implementation of the Elasticsearch connector in Flink 1.15 because it can be 
> a huge footgun with regards to delivery guarantees.
> 
> Best Regards,
> Alexander
> 
> On Thu, Dec 2, 2021 at 6:23 PM Lars Bachmann  > wrote:
> Hi David,
> 
> Thanks for the reply. I think especially in an error/failure handler metrics 
> are important in order to have proper monitoring/alerting in such cases. 
> Would be awesome if this could be added to Flink at some point :).
> 
> Regards,
> 
> Lars
> 
>> Am 02.12.2021 um 18:13 schrieb David Morávek > >:
>> 
>> Hi Lars,
>> 
>> quickly looking at the ES connector code, I think you're right and there is 
>> no way to do that :(  In general I'd say that being able to expose metrics 
>> is a valid request.
>> 
>> I can imagine having some kind of `RichActionRequestFailureHandler` with 
>> `{get|set}RuntimeContext` methods. More or less the same thing we already do 
>> with for example the `RichFunction`. This unfortunately requires some work 
>> on the Flink side.
>> 
>> cc @Arvid
>> 
>> On Thu, Dec 2, 2021 at 5:52 PM > > wrote:
>> Hi,
>> 
>> is there a way to expose custom metrics within an elasticsearch failure 
>> handler (ActionRequestFailureHandler)? To register custom metrics I need 
>> access to the runtime context but I don't see a way to access the 
>> context in the failure handler.
>> 
>> Thanks and regards,
>> 
>> Lars
> 



Re: custom metrics in elasticsearch ActionRequestFailureHandler

2021-12-03 Thread Alexander Preuß
Hi Lars,

What is your use case for the failure handler, just collecting metrics? We
want to remove the configurable failure handler in the new Sink API
implementation of the Elasticsearch connector in Flink 1.15 because it can
be a huge footgun with regards to delivery guarantees.

Best Regards,
Alexander

On Thu, Dec 2, 2021 at 6:23 PM Lars Bachmann 
wrote:

> Hi David,
>
> Thanks for the reply. I think especially in an error/failure handler
> metrics are important in order to have proper monitoring/alerting in such
> cases. Would be awesome if this could be added to Flink at some point :).
>
> Regards,
>
> Lars
>
> Am 02.12.2021 um 18:13 schrieb David Morávek :
>
> Hi Lars,
>
> quickly looking at the ES connector code, I think you're right and there
> is no way to do that :(  In general I'd say that being able to expose
> metrics is a valid request.
>
> I can imagine having some kind of `RichActionRequestFailureHandler` with
> `{get|set}RuntimeContext` methods. More or less the same thing we already
> do with for example the `RichFunction`. This unfortunately requires some
> work on the Flink side.
>
> cc @Arvid
>
> On Thu, Dec 2, 2021 at 5:52 PM  wrote:
>
>> Hi,
>>
>> is there a way to expose custom metrics within an elasticsearch failure
>> handler (ActionRequestFailureHandler)? To register custom metrics I need
>> access to the runtime context but I don't see a way to access the
>> context in the failure handler.
>>
>> Thanks and regards,
>>
>> Lars
>>
>
>


Re: custom metrics in elasticsearch ActionRequestFailureHandler

2021-12-02 Thread Lars Bachmann
Hi David,

Thanks for the reply. I think especially in an error/failure handler metrics 
are important in order to have proper monitoring/alerting in such cases. Would 
be awesome if this could be added to Flink at some point :).

Regards,

Lars

> Am 02.12.2021 um 18:13 schrieb David Morávek :
> 
> Hi Lars,
> 
> quickly looking at the ES connector code, I think you're right and there is 
> no way to do that :(  In general I'd say that being able to expose metrics is 
> a valid request.
> 
> I can imagine having some kind of `RichActionRequestFailureHandler` with 
> `{get|set}RuntimeContext` methods. More or less the same thing we already do 
> with for example the `RichFunction`. This unfortunately requires some work on 
> the Flink side.
> 
> cc @Arvid
> 
> On Thu, Dec 2, 2021 at 5:52 PM  > wrote:
> Hi,
> 
> is there a way to expose custom metrics within an elasticsearch failure 
> handler (ActionRequestFailureHandler)? To register custom metrics I need 
> access to the runtime context but I don't see a way to access the 
> context in the failure handler.
> 
> Thanks and regards,
> 
> Lars



Re: custom metrics in elasticsearch ActionRequestFailureHandler

2021-12-02 Thread David Morávek
Hi Lars,

quickly looking at the ES connector code, I think you're right and there is
no way to do that :(  In general I'd say that being able to expose metrics
is a valid request.

I can imagine having some kind of `RichActionRequestFailureHandler` with
`{get|set}RuntimeContext` methods. More or less the same thing we already
do with for example the `RichFunction`. This unfortunately requires some
work on the Flink side.

cc @Arvid

On Thu, Dec 2, 2021 at 5:52 PM  wrote:

> Hi,
>
> is there a way to expose custom metrics within an elasticsearch failure
> handler (ActionRequestFailureHandler)? To register custom metrics I need
> access to the runtime context but I don't see a way to access the
> context in the failure handler.
>
> Thanks and regards,
>
> Lars
>


Re: Custom metrics in Stateful Functions

2021-04-27 Thread Igal Shilman
Hello Cliff,

You are right, indeed defining custom metrics is not supported at the
moment.
I will file a JIRA issue so we can track this, and we will try to
prioritize this feature up.
Meanwhile, there are a lot of metrics that StateFun defines, like
invocations rates etc' perhaps you can find it useful already [1]

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-3.0/docs/deployment/metrics/

Kind regards,
Igal.


On Tue, Apr 27, 2021 at 5:19 PM Cliff Resnick  wrote:

> We think Embedded Statefun is a nicer fit than Datastream for some problem
> domains, but one thing we miss is support for custom metrics/counters. Is
> there a way to access the Flink support? It looks like if we want custom
> metrics we'll need to roll our own.
>


Re: custom metrics within a Trigger

2021-03-18 Thread Dawid Wysakowicz
Do you mind sharing the code how do you register your metrics with the
TriggerContext? It could help us identify where does name collisions
come from. As far as I am aware it should be fine to use the
TriggerContext for registering metrics.

Best,

Dawid

On 16/03/2021 17:35, Aleksander Sumowski wrote:
> Hi all,
> I'd like to measure how many events arrive within allowed lateness
> grouped by particular feature of the event. We assume particular type
> of events have way more late arrivals and would like to verify this.
> The natural place to make the measurement would be our custom trigger
> within onElement method as this is the place where we know whether
> event is late of not. The issue is that the only way to register
> MetricGroup at this moment is via Trigger.TriggerContext - which leads
> to re-registering and lots of logs:
>
>
>   
> `Name collision: Group already contains a Metric with the name XXX.
> Metric will not be reported.`
>
>
> Any hints how to tackle it?
>
> Thanks,
> Aleksander


OpenPGP_signature
Description: OpenPGP digital signature


Re: Custom metrics output

2020-07-21 Thread Fabian Hueske
Hi Joris,

I don't think that the approach of "add methods in operator class code that
can be called from the main Flink program" will work.

The most efficient approach would be implementing a ProcessFunction that
counts in 1-min time buckets (using event-time semantics) and updates the
metrics.
If you need the metric values to be exact, you can keep the intermediate
counts as operator state.
I would not use a KeyedProcessFunction because you didn't mention a key and
to save the overhead of the shuffle.

You can integrate the ProcessFunctions in different ways in your job.

1) just embed it into the regular flow. The ProcessFunction would just
count and forward every record it receives.
2) fork off a stream of records that just just hold the timestamp to a side
output and apply the ProcessFunction on the forked-off stream.

I think the first approach is simpler and more efficient. The
ProcessFunction would be an identity function to your actual data, just
counting and reporting metrics.

Best, Fabian

Am Mo., 20. Juli 2020 um 01:30 Uhr schrieb Joris Geer <
joris.van.der.g...@oracle.com>:

> Hi,
>
> We want to collect metrics for stream processing, typically counts
> aggregated over 1-minute buckets. However, we want these 1-minute
> boundaries determined by timestamps within the data records. Flink metrics
> do not handle this so we want to roll our own. How to proceed ? Some of our
> team members believe we can add methods in operator class code that can be
> called from the main Flink program, whist I am not sure this is supposed to
> be possible. Others consider using a side output stream with a record per
> input record and use Flink operators to do the aggregation. That may double
> the amount of records processed.
>
> Can we extend the Flink metrics to provide such aggregation ?
>
> Regards,
>
> Joris
>
>


Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
Thanks for the feedback. I will use elastalert to generate an alarm from
the logs.

On Wed, Jan 22, 2020, 15:03 Chesnay Schepler  wrote:

> It is not possible to access metrics from within a schema.
>
> I can't think of a non-hacky workaround (the hacky one being to create a
> custom kafka consumer that checks the schema class, casts it to your
> specific class, and then calls a method on your schema that accepts a
> metric group).
>
> On 22/01/2020 14:33, David Magalhães wrote:
>
> Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. Here
> is the constructor of *FlinkKafkaConsumer*. Inside *DeserializationSchema*
> I can't use *getRuntimeContext()*.
>
> FlinkKafkaConsumer
> 
> (List
> 
>  
> > topics, DeserializationSchema
> 
>  
> > deserializer, Properties
> 
>  props)
>
> On Wed, Jan 22, 2020 at 3:21 AM Yun Tang  wrote:
>
>> Hi David
>>
>> FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could
>> call function below to register your metrics group:
>>
>> getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")
>>
>>
>>
>>
>> Best
>> Yun Tang
>> --
>> *From:* David Magalhães 
>> *Sent:* Tuesday, January 21, 2020 3:45
>> *To:* user 
>> *Subject:* Custom Metrics outside RichFunctions
>>
>> Hi, I want to create a custom metric that shows the number of message
>> that couldn't be deserialized using a custom deserializer inside
>> FlinkKafkaConsumer.
>>
>> Looking into Metrics page (
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>>  )
>> that doesn't seem to be possible, because it it's a RichFunction.
>>
>> Anyone know another way to achieve this ?
>>
>> Thanks,
>> David
>>
>
>


Re: Custom Metrics outside RichFunctions

2020-01-22 Thread Chesnay Schepler

It is not possible to access metrics from within a schema.

I can't think of a non-hacky workaround (the hacky one being to create a 
custom kafka consumer that checks the schema class, casts it to your 
specific class, and then calls a method on your schema that accepts a 
metric group).


On 22/01/2020 14:33, David Magalhães wrote:
Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. 
Here is the constructor of *FlinkKafkaConsumer*. Inside 
*DeserializationSchema* I can't use *getRuntimeContext()*.


FlinkKafkaConsumer 
(List 
> topics, 
DeserializationSchema 
> deserializer, 
Properties 
 props)


On Wed, Jan 22, 2020 at 3:21 AM Yun Tang > wrote:


Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and
you could call function below to register your metrics group:


getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")




Best
Yun Tang

*From:* David Magalhães mailto:speeddra...@gmail.com>>
*Sent:* Tuesday, January 21, 2020 3:45
*To:* user mailto:user@flink.apache.org>>
*Subject:* Custom Metrics outside RichFunctions
Hi, I want to create a custom metric that shows the number of
message that couldn't be deserialized using a custom deserializer
inside FlinkKafkaConsumer.

Looking into Metrics page (

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html )
that doesn't seem to be possible, because it it's a RichFunction.

Anyone know another way to achieve this ?

Thanks,
David





Re: Custom Metrics outside RichFunctions

2020-01-22 Thread David Magalhães
Hi Yun, I'm trying to use inside a custom *DeserializationSchema*. Here is
the constructor of *FlinkKafkaConsumer*. Inside *DeserializationSchema* I
can't use *getRuntimeContext()*.

FlinkKafkaConsumer

(List

http://docs.oracle.com/javase/7/docs/api/java/lang/String.html?is-external=true>
> topics, DeserializationSchema

https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.html>
> deserializer, Properties

 props)

On Wed, Jan 22, 2020 at 3:21 AM Yun Tang  wrote:

> Hi David
>
> FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could
> call function below to register your metrics group:
>
> getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")
>
>
>
>
> Best
> Yun Tang
> --
> *From:* David Magalhães 
> *Sent:* Tuesday, January 21, 2020 3:45
> *To:* user 
> *Subject:* Custom Metrics outside RichFunctions
>
> Hi, I want to create a custom metric that shows the number of message that
> couldn't be deserialized using a custom deserializer inside
> FlinkKafkaConsumer.
>
> Looking into Metrics page (
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html
>  )
> that doesn't seem to be possible, because it it's a RichFunction.
>
> Anyone know another way to achieve this ?
>
> Thanks,
> David
>


Re: Custom Metrics outside RichFunctions

2020-01-21 Thread Yun Tang
Hi David

FlinkKafkaConsumer in itself is RichParallelSourceFunction, and you could call 
function below to register your metrics group:

getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter")


Best
Yun Tang

From: David Magalhães 
Sent: Tuesday, January 21, 2020 3:45
To: user 
Subject: Custom Metrics outside RichFunctions

Hi, I want to create a custom metric that shows the number of message that 
couldn't be deserialized using a custom deserializer inside FlinkKafkaConsumer.

Looking into Metrics page ( 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html 
) that doesn't seem to be possible, because it it's a RichFunction.

Anyone know another way to achieve this ?

Thanks,
David


Re: Custom Metrics in Windowed AggregateFunction

2018-12-19 Thread Chirag Dewan
 That was my first alternative actually :)
That works well for per window metrics. And though it fits perfectly for 
smaller windows, it might not be frequent enough for larger window sizes. 
Thanks,
Chirag 
   On Wednesday, 19 December, 2018, 4:15:41 PM IST, Dawid Wysakowicz 
 wrote: 
 
  
Hi Chirag,
 
I am afraid you are right you cannot access metrics from within 
AggregateFunction in WindowedStream. You can though use rich variant of 
WindowFunction, which is invoked for every window with the results of 
AggregateFunction. Would that be enough for your use case to use 
.aggregate(aggregateFunction, windowFunction) and register metrics in the 
windowFunction?
 
Best,
 
Dawid
 
 On 19/12/2018 04:30, Chirag Dewan wrote:
  
 
Hi, 
  I am writing a Flink job for aggregating events in a window.  
  I am trying to use the AggregateFunction implementation for this.  
  Now, since WindowedStream does not allow a RichAggregateFunction for 
aggregation, I cant use the RuntimeContext to get the Metric group.  
  I dont even see any other way of accessing the Metric Group in a non-rich 
function implementation? 
  Is there any way around here?  
  Any help appreciated. 
  Thanks, 
  Chirag   

Re: Custom Metrics in Windowed AggregateFunction

2018-12-19 Thread Dawid Wysakowicz
Hi Chirag,

I am afraid you are right you cannot access metrics from within
AggregateFunction in WindowedStream. You can though use rich variant of
WindowFunction, which is invoked for every window with the results of
AggregateFunction. Would that be enough for your use case to use
.aggregate(aggregateFunction, windowFunction) and register metrics in
the windowFunction?

Best,

Dawid

On 19/12/2018 04:30, Chirag Dewan wrote:
> Hi,
>
> I am writing a Flink job for aggregating events in a window. 
>
> I am trying to use the /AggregateFunction/ implementation for this. 
>
> Now, since WindowedStream does not allow a RichAggregateFunction for
> aggregation, I cant use the RuntimeContext to get the Metric group. 
>
> I dont even see any other way of accessing the Metric Group in a
> non-rich function implementation?
>
> Is there any way around here? 
>
> Any help appreciated.
>
> Thanks,
>
> Chirag


signature.asc
Description: OpenPGP digital signature


Re: Custom metrics in Watermark Assigner

2018-09-13 Thread vino yang
Hi Oleksandr,

It sounds like a good idea to make a lot of internal information more
transparent. But it would be better if you could be more specific.

Thanks, vino.

Oleksandr Nitavskyi  于2018年9月12日周三 下午9:48写道:

> Hello guys,
>
>
>
> In our custom AssignerWithPunctuatedWatermarks we want to have custom
> metrics. Unfortunately operator’s wrapping this assigner interface is
> hidden from user API. What do you think if we add some optional api in
> order to let user possibility to register custom metrics in the watermark
> assigner?
>
> This feature would increase introspectability of the custom watermark
> logic, which sometime is hard to debug.
>
>
>
> Kind regards
>
> Oleksandr
>


Re: Custom metrics reporter classloading problem

2018-07-11 Thread Gyula Fóra
Thanks for the explanation, that makes sense.
For some reason I thought that in Yarn all stuff goes into the classpath.

Gy

Chesnay Schepler  ezt írta (időpont: 2018. júl. 11.,
Sze, 15:16):

> Reporters do not have access to libraries provided with user-jars.
> They are instantiated when JM/TM starts, i.e. before any user-code is
> even accessible.
>
> My recommendation would be to either put the kafka dependencies in the
> /lib folder or try to relocate the kafka code in the reporter.
>
> On 11.07.2018 14:59, Gyula Fóra wrote:
> > Hi all,
> >
> > I have ran into the following problem and I want to double check
> > wether this is intended behaviour.
> >
> > I have a custom metrics reporter that pushes things to Kafka (so it
> > creates a KafkaProducer in the open method etc.etc.) for my streaming
> job.
> >
> > Naturally as my Flink job consumes from Kafka so it has the kafka
> > connector dependencies I set the Kafka dependencies to provided in my
> > metric reporter project and I put the built kafkaReporter.jar into the
> > Flink lib. However it seems that the metrics reporter is instantiated
> > without the user code classes since I get a NoClassdefFound error for
> > KafkaProducer even though my streaming job starts successfully
> > reading/writing kafka.
> >
> > Any ideas why this happens and how to solve it? I am slightly against
> > putting the kafka dependencies twice on the classpath as it has only
> > caused problems in the past...
> >
> > Gyula
>
>
>


Re: Custom metrics reporter classloading problem

2018-07-11 Thread Chesnay Schepler

Reporters do not have access to libraries provided with user-jars.
They are instantiated when JM/TM starts, i.e. before any user-code is 
even accessible.


My recommendation would be to either put the kafka dependencies in the 
/lib folder or try to relocate the kafka code in the reporter.


On 11.07.2018 14:59, Gyula Fóra wrote:

Hi all,

I have ran into the following problem and I want to double check 
wether this is intended behaviour.


I have a custom metrics reporter that pushes things to Kafka (so it 
creates a KafkaProducer in the open method etc.etc.) for my streaming job.


Naturally as my Flink job consumes from Kafka so it has the kafka 
connector dependencies I set the Kafka dependencies to provided in my 
metric reporter project and I put the built kafkaReporter.jar into the 
Flink lib. However it seems that the metrics reporter is instantiated 
without the user code classes since I get a NoClassdefFound error for 
KafkaProducer even though my streaming job starts successfully 
reading/writing kafka.


Any ideas why this happens and how to solve it? I am slightly against 
putting the kafka dependencies twice on the classpath as it has only 
caused problems in the past...


Gyula





Re: Custom Metrics

2017-12-14 Thread Piotr Nowojski
Hi,

> I have couple more questions related to metrics. I use Influx db reporter to 
> report flink metrics and I see a lot of metrics are bring reported. Is there 
> a way to select only a subset of metrics that we need to monitor the 
> application?

At this point is up to either reporter, or up to the system that metrics are 
reported. You would need to extend an Influx db reporter to add some 
configuration options to ignore some metrics.

> Also, Is there a way to specify custom metics scope? Basically I register 
> metrics like below, add a custom metric group and then add a meter per user. 
> I would like this to be reported as measurement "Users" and tags with user 
> id. This way I can easily visualize the data in grafana or any other tool by 
> selecting the measurement and group by tag. Is there a way to report like 
> that instead of host, process_type, tm_id, job_name, task_name & 
> subtask_index?


Can not you ignore first couple of groups/scopes in the Grafana? I think you 
can also add more groups in the user scope.

metricGroup.addGroup("Users”).addGroup(“Foo”).addGroup(“Bar”).

Piotrek

> On 13 Dec 2017, at 22:34, Navneeth Krishnan  wrote:
> 
> Thanks Pitor.
> 
> I have couple more questions related to metrics. I use Influx db reporter to 
> report flink metrics and I see a lot of metrics are bring reported. Is there 
> a way to select only a subset of metrics that we need to monitor the 
> application?
> 
> Also, Is there a way to specify custom metics scope? Basically I register 
> metrics like below, add a custom metric group and then add a meter per user. 
> I would like this to be reported as measurement "Users" and tags with user 
> id. This way I can easily visualize the data in grafana or any other tool by 
> selecting the measurement and group by tag. Is there a way to report like 
> that instead of host, process_type, tm_id, job_name, task_name & 
> subtask_index?
> 
> metricGroup.addGroup("Users")
> .meter(userId, new DropwizardMeterWrapper(new 
> com.codahale.metrics.Meter()));
> Thanks a bunch.
> 
> On Mon, Dec 11, 2017 at 11:12 PM, Piotr Nowojski  > wrote:
> Hi,
> 
> Reporting once per 10 seconds shouldn’t create problems. Best to try it out. 
> Let us know if you get into some troubles :)
> 
> Piotrek
> 
>> On 11 Dec 2017, at 18:23, Navneeth Krishnan > > wrote:
>> 
>> Thanks Piotr. 
>> 
>> Yes, passing the metric group should be sufficient. The subcomponents will 
>> not be able to provide the list of metrics to register since the metrics are 
>> created based on incoming data by tenant. Also I am planning to have the 
>> metrics reported every 10 seconds and hope it shouldn't be a problem. We use 
>> influx and grafana to plot the metrics.
>> 
>> The option 2 that I had in mind was to collect all metrics and use influx db 
>> sink to report it directly inside the pipeline. But it seems reporting per 
>> node might not be possible.
>> 
>> 
>> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski > > wrote:
>> Hi,
>> 
>> I’m not sure if I completely understand your issue.
>> 
>> 1.
>> - You don’t have to pass RuntimeContext, you can always pass just the 
>> MetricGroup or ask your components/subclasses “what metrics do you want to 
>> register” and register them at the top level.
>> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for 
>> Flink, as long as you have a reasonable reporting interval. However keep in 
>> mind that Flink only reports your metrics and you still need something to 
>> read/handle/process/aggregate your metrics
>> 2.
>> I don’t think that reporting per node/jvm is possible with Flink’s metric 
>> system. For that you would need some other solution, like report your 
>> metrics using JMX (directly register MBeans from your code)
>> 
>> Piotrek
>> 
>> > On 10 Dec 2017, at 18:51, Navneeth Krishnan > > > wrote:
>> >
>> > Hi,
>> >
>> > I have a streaming pipeline running on flink and I need to collect metrics 
>> > to identify how my algorithm is performing. The entire pipeline is 
>> > multi-tenanted and I also need metrics per tenant. Lets say there would be 
>> > around 20 metrics to be captured per tenant. I have the following ideas 
>> > for implemention but any suggestions on which one might be better will 
>> > help.
>> >
>> > 1. Use flink metric group and register a group per tenant at the operator 
>> > level. The disadvantage of this approach for me is I need the 
>> > runtimecontext parameter to register a metric and I have various 
>> > subclasses to which I need to pass this object to limit the metric scope 
>> > within the operator. Also there will be too many metrics reported if there 
>> > are higher number of subtasks.
>> > How is everyone accessing 

Re: Custom Metrics

2017-12-13 Thread Navneeth Krishnan
Thanks Pitor.

I have couple more questions related to metrics. I use Influx db reporter
to report flink metrics and I see a lot of metrics are bring reported. Is
there a way to select only a subset of metrics that we need to monitor the
application?

Also, Is there a way to specify custom metics scope? Basically I register
metrics like below, add a custom metric group and then add a meter per
user. I would like this to be reported as measurement "Users" and tags with
user id. This way I can easily visualize the data in grafana or any other
tool by selecting the measurement and group by tag. Is there a way to
report like that instead of host, process_type, tm_id, job_name, task_name
& subtask_index?

metricGroup.addGroup("Users")
.meter(userId, new DropwizardMeterWrapper(new
com.codahale.metrics.Meter()));

Thanks a bunch.

On Mon, Dec 11, 2017 at 11:12 PM, Piotr Nowojski 
wrote:

> Hi,
>
> Reporting once per 10 seconds shouldn’t create problems. Best to try it
> out. Let us know if you get into some troubles :)
>
> Piotrek
>
> On 11 Dec 2017, at 18:23, Navneeth Krishnan 
> wrote:
>
> Thanks Piotr.
>
> Yes, passing the metric group should be sufficient. The subcomponents will
> not be able to provide the list of metrics to register since the metrics
> are created based on incoming data by tenant. Also I am planning to have
> the metrics reported every 10 seconds and hope it shouldn't be a problem.
> We use influx and grafana to plot the metrics.
>
> The option 2 that I had in mind was to collect all metrics and use influx
> db sink to report it directly inside the pipeline. But it seems reporting
> per node might not be possible.
>
>
> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure if I completely understand your issue.
>>
>> 1.
>> - You don’t have to pass RuntimeContext, you can always pass just the
>> MetricGroup or ask your components/subclasses “what metrics do you want to
>> register” and register them at the top level.
>> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for
>> Flink, as long as you have a reasonable reporting interval. However keep in
>> mind that Flink only reports your metrics and you still need something to
>> read/handle/process/aggregate your metrics
>> 2.
>> I don’t think that reporting per node/jvm is possible with Flink’s metric
>> system. For that you would need some other solution, like report your
>> metrics using JMX (directly register MBeans from your code)
>>
>> Piotrek
>>
>> > On 10 Dec 2017, at 18:51, Navneeth Krishnan 
>> wrote:
>> >
>> > Hi,
>> >
>> > I have a streaming pipeline running on flink and I need to collect
>> metrics to identify how my algorithm is performing. The entire pipeline is
>> multi-tenanted and I also need metrics per tenant. Lets say there would be
>> around 20 metrics to be captured per tenant. I have the following ideas for
>> implemention but any suggestions on which one might be better will help.
>> >
>> > 1. Use flink metric group and register a group per tenant at the
>> operator level. The disadvantage of this approach for me is I need the
>> runtimecontext parameter to register a metric and I have various subclasses
>> to which I need to pass this object to limit the metric scope within the
>> operator. Also there will be too many metrics reported if there are higher
>> number of subtasks.
>> > How is everyone accessing flink state/ metrics from other classes where
>> you don't have access to runtimecontext?
>> >
>> > 2. Use a custom singleton metric registry to add and send these metrics
>> using custom sink. Instead of using flink metric group to collect metrics
>> per operatior - subtask, collect per jvm and use influx sink to send the
>> metric data. What i'm not sure in this case is how to collect only once per
>> node/jvm.
>> >
>> > Thanks a bunch in advance.
>>
>>
>
>


Re: Custom Metrics

2017-12-11 Thread Piotr Nowojski
Hi,

Reporting once per 10 seconds shouldn’t create problems. Best to try it out. 
Let us know if you get into some troubles :)

Piotrek

> On 11 Dec 2017, at 18:23, Navneeth Krishnan  wrote:
> 
> Thanks Piotr. 
> 
> Yes, passing the metric group should be sufficient. The subcomponents will 
> not be able to provide the list of metrics to register since the metrics are 
> created based on incoming data by tenant. Also I am planning to have the 
> metrics reported every 10 seconds and hope it shouldn't be a problem. We use 
> influx and grafana to plot the metrics.
> 
> The option 2 that I had in mind was to collect all metrics and use influx db 
> sink to report it directly inside the pipeline. But it seems reporting per 
> node might not be possible.
> 
> 
> On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski  > wrote:
> Hi,
> 
> I’m not sure if I completely understand your issue.
> 
> 1.
> - You don’t have to pass RuntimeContext, you can always pass just the 
> MetricGroup or ask your components/subclasses “what metrics do you want to 
> register” and register them at the top level.
> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for 
> Flink, as long as you have a reasonable reporting interval. However keep in 
> mind that Flink only reports your metrics and you still need something to 
> read/handle/process/aggregate your metrics
> 2.
> I don’t think that reporting per node/jvm is possible with Flink’s metric 
> system. For that you would need some other solution, like report your metrics 
> using JMX (directly register MBeans from your code)
> 
> Piotrek
> 
> > On 10 Dec 2017, at 18:51, Navneeth Krishnan  > > wrote:
> >
> > Hi,
> >
> > I have a streaming pipeline running on flink and I need to collect metrics 
> > to identify how my algorithm is performing. The entire pipeline is 
> > multi-tenanted and I also need metrics per tenant. Lets say there would be 
> > around 20 metrics to be captured per tenant. I have the following ideas for 
> > implemention but any suggestions on which one might be better will help.
> >
> > 1. Use flink metric group and register a group per tenant at the operator 
> > level. The disadvantage of this approach for me is I need the 
> > runtimecontext parameter to register a metric and I have various subclasses 
> > to which I need to pass this object to limit the metric scope within the 
> > operator. Also there will be too many metrics reported if there are higher 
> > number of subtasks.
> > How is everyone accessing flink state/ metrics from other classes where you 
> > don't have access to runtimecontext?
> >
> > 2. Use a custom singleton metric registry to add and send these metrics 
> > using custom sink. Instead of using flink metric group to collect metrics 
> > per operatior - subtask, collect per jvm and use influx sink to send the 
> > metric data. What i'm not sure in this case is how to collect only once per 
> > node/jvm.
> >
> > Thanks a bunch in advance.
> 
> 



Re: Custom Metrics

2017-12-11 Thread Navneeth Krishnan
Thanks Piotr.

Yes, passing the metric group should be sufficient. The subcomponents will
not be able to provide the list of metrics to register since the metrics
are created based on incoming data by tenant. Also I am planning to have
the metrics reported every 10 seconds and hope it shouldn't be a problem.
We use influx and grafana to plot the metrics.

The option 2 that I had in mind was to collect all metrics and use influx
db sink to report it directly inside the pipeline. But it seems reporting
per node might not be possible.


On Mon, Dec 11, 2017 at 3:14 AM, Piotr Nowojski 
wrote:

> Hi,
>
> I’m not sure if I completely understand your issue.
>
> 1.
> - You don’t have to pass RuntimeContext, you can always pass just the
> MetricGroup or ask your components/subclasses “what metrics do you want to
> register” and register them at the top level.
> - Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for
> Flink, as long as you have a reasonable reporting interval. However keep in
> mind that Flink only reports your metrics and you still need something to
> read/handle/process/aggregate your metrics
> 2.
> I don’t think that reporting per node/jvm is possible with Flink’s metric
> system. For that you would need some other solution, like report your
> metrics using JMX (directly register MBeans from your code)
>
> Piotrek
>
> > On 10 Dec 2017, at 18:51, Navneeth Krishnan 
> wrote:
> >
> > Hi,
> >
> > I have a streaming pipeline running on flink and I need to collect
> metrics to identify how my algorithm is performing. The entire pipeline is
> multi-tenanted and I also need metrics per tenant. Lets say there would be
> around 20 metrics to be captured per tenant. I have the following ideas for
> implemention but any suggestions on which one might be better will help.
> >
> > 1. Use flink metric group and register a group per tenant at the
> operator level. The disadvantage of this approach for me is I need the
> runtimecontext parameter to register a metric and I have various subclasses
> to which I need to pass this object to limit the metric scope within the
> operator. Also there will be too many metrics reported if there are higher
> number of subtasks.
> > How is everyone accessing flink state/ metrics from other classes where
> you don't have access to runtimecontext?
> >
> > 2. Use a custom singleton metric registry to add and send these metrics
> using custom sink. Instead of using flink metric group to collect metrics
> per operatior - subtask, collect per jvm and use influx sink to send the
> metric data. What i'm not sure in this case is how to collect only once per
> node/jvm.
> >
> > Thanks a bunch in advance.
>
>


Re: Custom Metrics

2017-12-11 Thread Piotr Nowojski
Hi,

I’m not sure if I completely understand your issue.

1.
- You don’t have to pass RuntimeContext, you can always pass just the 
MetricGroup or ask your components/subclasses “what metrics do you want to 
register” and register them at the top level.
- Reporting tens/hundreds/thousands of metrics shouldn’t be an issue for Flink, 
as long as you have a reasonable reporting interval. However keep in mind that 
Flink only reports your metrics and you still need something to 
read/handle/process/aggregate your metrics
2.
I don’t think that reporting per node/jvm is possible with Flink’s metric 
system. For that you would need some other solution, like report your metrics 
using JMX (directly register MBeans from your code)

Piotrek

> On 10 Dec 2017, at 18:51, Navneeth Krishnan  wrote:
> 
> Hi,
> 
> I have a streaming pipeline running on flink and I need to collect metrics to 
> identify how my algorithm is performing. The entire pipeline is 
> multi-tenanted and I also need metrics per tenant. Lets say there would be 
> around 20 metrics to be captured per tenant. I have the following ideas for 
> implemention but any suggestions on which one might be better will help.
> 
> 1. Use flink metric group and register a group per tenant at the operator 
> level. The disadvantage of this approach for me is I need the runtimecontext 
> parameter to register a metric and I have various subclasses to which I need 
> to pass this object to limit the metric scope within the operator. Also there 
> will be too many metrics reported if there are higher number of subtasks. 
> How is everyone accessing flink state/ metrics from other classes where you 
> don't have access to runtimecontext?
> 
> 2. Use a custom singleton metric registry to add and send these metrics using 
> custom sink. Instead of using flink metric group to collect metrics per 
> operatior - subtask, collect per jvm and use influx sink to send the metric 
> data. What i'm not sure in this case is how to collect only once per node/jvm.
> 
> Thanks a bunch in advance.