Hi Becket,

Thanks for the answer :)

> By time-based metric, I meant the portion of time spent on producing the
> record to downstream. For example, a source connector can report that it's
> spending 80% of time to emit record to downstream processing pipeline. In
> another case, a sink connector may report that its spending 30% of time
> producing the records to the external system.
> 
> This is in some sense equivalent to the buffer usage metric:

>   - 80% of time spent on emitting records to downstream ---> downstream
> node is bottleneck ---> output buffer is probably full.
>   - 30% of time spent on emitting records to downstream ---> downstream
> node is not bottleneck ---> output buffer is probably not full.

If by “time spent on emitting records to downstream” you understand “waiting on 
back pressure”, then I see your point. And I agree that some kind of ratio/time 
based metric gives you more information. However under “time spent on emitting 
records to downstream” might be hidden the following (extreme) situation:

1. Job is barely able to handle influx of records, there is 99% CPU/resource 
usage in the cluster, but nobody is bottlenecked/backpressured, all output 
buffers are empty, everybody is waiting in 1% of it’s time for more records to 
process.
2. 80% time can still be spent on "down stream operators”, because they are the 
CPU intensive operations, but this doesn’t mean that increasing the parallelism 
down the stream will help with anything there. To the contrary, increasing 
parallelism of the source operator might help to increase resource utilisation 
up to 100%.

However, this “time based/ratio” approach can be extended to in/output buffer 
usage. Besides collecting an information that input/output buffer is 
full/empty, we can probe profile how often are buffer empty/full. If output 
buffer is full 1% of times, there is almost no back pressure. If it’s full 80% 
of times, there is some back pressure, if it’s full 99.9% of times, there is 
huge back pressure. 

Now for autoscaling you could compare the input & output buffers fill ratio:

1. Both are high, the source of bottleneck is down the stream
2. Output is low, input is high, this is the bottleneck and the higher the 
difference, the bigger source of bottleneck is this is operator/task
3. Output is high, input is low - there was some load spike that we are 
currently finishing to process



But long story short, we are probably diverging from the topic of this 
discussion, and we can discuss this at some later point.

For now, for sources:

as I wrote before, +1 for:
 - pending.bytes, Gauge
 - pending.messages, Gauge

When we will be developing/discussing SourceReader from FLIP-27 we might then 
add:

- in-memory.buffer.usage (0 - 100%)

Which will be estimated automatically by Flink while user will be able to 
override/provide better estimation.

Piotrek 

> On 5 Jun 2019, at 05:42, Becket Qin <becket....@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Thanks for the explanation. Please see some clarifications below.
> 
> By time-based metric, I meant the portion of time spent on producing the
> record to downstream. For example, a source connector can report that it's
> spending 80% of time to emit record to downstream processing pipeline. In
> another case, a sink connector may report that its spending 30% of time
> producing the records to the external system.
> 
> This is in some sense equivalent to the buffer usage metric:
>   - 80% of time spent on emitting records to downstream ---> downstream
> node is bottleneck ---> output buffer is probably full.
>   - 30% of time spent on emitting records to downstream ---> downstream
> node is not bottleneck ---> output buffer is probably not full.
> 
> However, the time-based metric has a few advantages that the buffer usage
> metric may not have.
> 
> 1.  Buffer usage metric may not be applicable to all the connector
> implementations, while reporting time-based metric are always doable.
> Some source connectors may not have any input buffer, or they may use some
> third party library that does not expose the input buffer at all.
> Similarly, for sink connectors, the implementation may not have any output
> buffer, or the third party library does not expose such buffer.
> 
> 2. Although both type of metrics can detect bottleneck, time-based metrics
> can be used to generate a more informed action to remove the bottleneck.
> For example, when the downstream is bottleneck, the output buffer usage
> metric is likely to be 100%, and the input buffer usage might be 0%. That
> means we don't know what is the suitable parallelism to lift the
> bottleneck. The time-based metric, on the other hand, would give useful
> information, e.g. if 80% of time was spent on emitting records, we can
> roughly increase the downstream node parallelism by 4 times.
> 
> Admittedly, the time-based metrics are more expensive than buffer usage. So
> we may have to do some sampling to reduce the cost. But in general, using
> time-based metrics seems worth adding.
> 
> That being said, I don't think buffer usage metric and time-based metrics
> are mutually exclusive. We can probably have both. It is just that in
> practice, features like auto-scaling might prefer time-based metrics for
> the reason stated above.
> 
>> 1. Define the metrics that would allow us to manually detect bottlenecks.
> As I wrote, we already have them in most of the places, except of
> sources/sinks.
>> 2. Use those metrics, to automatically detect bottlenecks. Currently we
> are only automatically detecting back pressure and reporting it to the user
> in web UI (is it exposed as a metric at all?). Detecting the root cause of
> the back pressure (bottleneck) is one step further.
>> 3. Use the knowledge about where exactly the bottleneck is located, to
> try to do something with it.
> 
> As explained above, I think time-based metric also addresses item 1 and
> item 2.
> 
> Any thoughts?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> On Mon, Jun 3, 2019 at 4:14 PM Piotr Nowojski <pi...@ververica.com> wrote:
> 
>> Hi again :)
>> 
>>>  - pending.bytes, Gauge
>>>  - pending.messages, Gauge
>> 
>> 
>> +1
>> 
>> And true, instead of overloading one of the metric it is better when user
>> can choose to provide only one of them.
>> 
>> Re 2:
>> 
>>> If I understand correctly, this metric along with the pending mesages /
>>> bytes would answer the questions of:
>> 
>>> - Does the connector consume fast enough? Lagging behind + empty buffer
>> =
>>> cannot consume fast enough.
>>> - Does the connector emit fast enough? Lagging behind + full buffer =
>>> cannot emit fast enough, i.e. the Flink pipeline is slow.
>> 
>> Yes, exactly. This can also be used to support decisions like changing the
>> parallelism of the sources and/or down stream operators.
>> 
>> I’m not sure if I understand your proposal with time based measurements.
>> Maybe I’m missing something, but I do not see how measuring time alone
>> could answer the problem: where is the bottleneck. Time spent on the
>> next/emit might be short or long (depending on how heavy to process the
>> record is) and the source can still be bottlenecked/back pressured or not.
>> Usually the easiest and the most reliable way how to detect bottlenecks is
>> by checking usage of input & output buffers, since when input buffer is
>> full while output buffer is empty, that’s the definition of a bottleneck.
>> Also this is usually very easy and cheap to measure (it works effectively
>> the same way as current’s Flink back pressure monitoring, but more cleanly,
>> without probing thread’s stack traces).
>> 
>> Also keep in mind that we are already using the buffer usage metrics for
>> detecting the bottlenecks in Flink’s internal network exchanges (manual
>> work). That’s the reason why I wanted to extend this to sources/sinks,
>> since they are currently our blind spot.
>> 
>>> One feature we are currently working on to scale Flink automatically
>> relies
>>> on some metrics answering the same question
>> 
>> That would be very helpful feature. I think in order to achieve that we
>> would need to:
>> 1. Define the metrics that would allow us to manually detect bottlenecks.
>> As I wrote, we already have them in most of the places, except of
>> sources/sinks.
>> 2. Use those metrics, to automatically detect bottlenecks. Currently we
>> are only automatically detecting back pressure and reporting it to the user
>> in web UI (is it exposed as a metric at all?). Detecting the root cause of
>> the back pressure (bottleneck) is one step further.
>> 3. Use the knowledge about where exactly the bottleneck is located, to try
>> to do something with it.
>> 
>> I think you are aiming for point 3., but before we reach it, we are still
>> missing 1. & 2. Also even if we have 3., there is a value in 1 & 2 for
>> manual analysis/dashboards.
>> 
>> However, having the knowledge of where the bottleneck is, doesn’t
>> necessarily mean that we know what we can do about it. For example
>> increasing parallelism might or might not help with anything (data skew,
>> bottleneck on some resource that does not scale), but this remark applies
>> always, regardless of the way how did we detect the bottleneck.
>> 
>> Piotrek
>> 
>>> On 3 Jun 2019, at 06:16, Becket Qin <becket....@gmail.com> wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks for the suggestion. Some thoughts below:
>>> 
>>> Re 1: The pending messages / bytes.
>>> I completely agree these are very useful metrics and we should expect the
>>> connector to report. WRT the way to expose them, it seems more consistent
>>> to add two metrics instead of adding a method (unless there are other use
>>> cases other than metric reporting). So we can add the following two
>> metrics.
>>>  - pending.bytes, Gauge
>>>  - pending.messages, Gauge
>>> Applicable connectors can choose to report them. These two metrics along
>>> with latency should be sufficient for users to understand the progress
>> of a
>>> connector.
>>> 
>>> 
>>> Re 2: Number of buffered data in-memory of the connector
>>> If I understand correctly, this metric along with the pending mesages /
>>> bytes would answer the questions of:
>>> - Does the connector consume fast enough? Lagging behind + empty buffer
>> =
>>> cannot consume fast enough.
>>> - Does the connector emit fast enough? Lagging behind + full buffer =
>>> cannot emit fast enough, i.e. the Flink pipeline is slow.
>>> 
>>> One feature we are currently working on to scale Flink automatically
>> relies
>>> on some metrics answering the same question, more specifically, we are
>>> profiling the time spent on .next() method (time to consume) and the time
>>> spent on .collect() method (time to emit / process). One advantage of
>> such
>>> method level time cost allows us to calculate the parallelism required to
>>> keep up in case their is a lag.
>>> 
>>> However, one concern I have regarding such metric is that they are
>>> implementation specific. Either profiling on the method time, or
>> reporting
>>> buffer usage assumes the connector are implemented in such a way. A
>>> slightly better solution might be have the following metric:
>>> 
>>>    - EmitTimeRatio (or FetchTimeRatio): The time spent on emitting
>>> records / Total time elapsed.
>>> 
>>> This assumes that the source connectors have to emit the records to the
>>> downstream at some point. The emission may take some time ( e.g. go
>> through
>>> chained operators). And the rest of the time are spent to prepare the
>>> record to emit, including time for consuming and format conversion, etc.
>>> Ideally, we'd like to see the time spent on record fetch and emit to be
>>> about the same, so no one is bottleneck for the other.
>>> 
>>> The downside of these time based metrics is additional overhead to get
>> the
>>> time, therefore sampling might be needed. But in practice I feel such
>> time
>>> based metric might be more useful if we want to take action.
>>> 
>>> 
>>> I think we should absolutely add metrics in (1) to the metric convention.
>>> We could also add the metrics mentioned in (2) if we reach consensus on
>>> that. What do you think?
>>> 
>>> Thanks,
>>> 
>>> Jiangjie (Becket) Qin
>>> 
>>> 
>>> On Fri, May 31, 2019 at 4:26 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>> 
>>>> Hey Becket,
>>>> 
>>>> Re 1a) and 1b) +1 from my side.
>>>> 
>>>> I’ve discussed this issue:
>>>> 
>>>>>>> 
>>>>>>> 2. It would be nice to have metrics, that allow us to check the cause
>>>> of
>>>>>>> back pressure:
>>>>>>> a) for sources, length of input queue (in bytes? Or boolean
>>>>>>> hasSomethingl/isEmpty)
>>>>>>> b) for sinks, length of output queue (in bytes? Or boolean
>>>>>>> hasSomething/isEmpty)
>>>> 
>>>> With Nico at some lengths and he also saw the benefits of them. We also
>>>> have more concrete proposal for that.
>>>> 
>>>> Actually there are two really useful metrics, that we are missing
>>>> currently:
>>>> 
>>>> 1. Number of data/records/bytes in the backlog to process. For example
>>>> remaining number of bytes in unread files. Or pending data in Kafka
>> topics.
>>>> 2. Number of buffered data in-memory of the connector, that are waiting
>> to
>>>> be processed pushed to Flink pipeline.
>>>> 
>>>> Re 1:
>>>> This would have to be a metric provided directly by a connector. It
>> could
>>>> be an undefined `int`:
>>>> 
>>>> `int backlog` - estimate of pending work.
>>>> 
>>>> “Undefined” meaning that it would be up to a connector to decided
>> whether
>>>> it’s measured in bytes, records, pending files or whatever it is
>> possible
>>>> to provide by the connector. This is because I assume not every
>> connector
>>>> can provide exact number and for some of them it might be impossible to
>>>> provide records number of bytes count.
>>>> 
>>>> Re 2:
>>>> This metric could be either provided by a connector, or calculated
>> crudely
>>>> by Flink:
>>>> 
>>>> `float bufferUsage` - value from [0.0, 1.0] range
>>>> 
>>>> Percentage of used in memory buffers, like in Kafka’s handover.
>>>> 
>>>> It could be crudely implemented by Flink with FLIP-27
>>>> SourceReader#isAvailable. If SourceReader is not available reported
>>>> `bufferUsage` could be 0.0. If it is available, it could be 1.0. I think
>>>> this would be a good enough estimation for most of the use cases (that
>>>> could be overloaded and implemented better if desired). Especially
>> since we
>>>> are reporting only probed values: if probed values are almost always
>> “1.0”,
>>>> it would mean that we have a back pressure. If they are almost always
>>>> “0.0”, there is probably no back pressure at the sources.
>>>> 
>>>> What do you think about this?
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 30 May 2019, at 11:41, Becket Qin <becket....@gmail.com> wrote:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> Thanks a lot for all the feedback and comments. I'd like to continue
>> the
>>>>> discussion on this FLIP.
>>>>> 
>>>>> I updated the FLIP-33 wiki to remove all the histogram metrics from the
>>>>> first version of metric convention due to the performance concern. The
>>>> plan
>>>>> is to introduce them later when we have a mechanism to opt in/out
>>>> metrics.
>>>>> At that point, users can decide whether they want to pay the cost to
>> get
>>>>> the metric or not.
>>>>> 
>>>>> As Stephan suggested, for this FLIP, let's first try to agree on the
>>>> small
>>>>> list of conventional metrics that connectors should follow.
>>>>> Just to be clear, the purpose of the convention is not to enforce every
>>>>> connector to report all these metrics, but to provide a guidance in
>> case
>>>>> these metrics are reported by some connectors.
>>>>> 
>>>>> 
>>>>> @ Stephan & Chesnay,
>>>>> 
>>>>> Regarding the duplication of `RecordsIn` metric in operator / task
>>>>> IOMetricGroups, from what I understand, for source operator, it is
>>>> actually
>>>>> the SourceFunction that reports the operator level
>>>>> RecordsIn/RecordsInPerSecond metric. So they are essentially the same
>>>>> metric in the operator level IOMetricGroup. Similarly for the Sink
>>>>> operator, the operator level RecordsOut/RecordsOutPerSecond metrics are
>>>>> also reported by the Sink function. I marked them as existing in the
>>>>> FLIP-33 wiki page. Please let me know if I misunderstood.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jiangjie (Becket) Qin
>>>>> 
>>>>> 
>>>>> On Thu, May 30, 2019 at 5:16 PM Becket Qin <becket....@gmail.com>
>> wrote:
>>>>> 
>>>>>> Hi Piotr,
>>>>>> 
>>>>>> Thanks a lot for the feedback.
>>>>>> 
>>>>>> 1a) I guess you are referring to the part that "original system
>> specific
>>>>>> metrics should also be reported". The performance impact depends on
>> the
>>>>>> implementation. An efficient implementation would only record the
>> metric
>>>>>> once, but report them with two different metric names. This is
>> unlikely
>>>> to
>>>>>> hurt performance.
>>>>>> 
>>>>>> 1b) Yes, I agree that we should avoid adding overhead to the critical
>>>> path
>>>>>> by all means. This is sometimes a tradeoff, running blindly without
>> any
>>>>>> metric gives best performance, but sometimes might be frustrating when
>>>> we
>>>>>> debug some issues.
>>>>>> 
>>>>>> 2. The metrics are indeed very useful. Are they supposed to be
>> reported
>>>> by
>>>>>> the connectors or Flink itself? At this point FLIP-33 is more focused
>> on
>>>>>> provide a guidance to the connector authors on the metrics reporting.
>>>> That
>>>>>> said, after FLIP-27, I think we should absolutely report these metrics
>>>> in
>>>>>> the abstract implementation. In any case, the metric convention in
>> this
>>>>>> list are expected to evolve over time.
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Jiangjie (Becket) Qin
>>>>>> 
>>>>>> On Tue, May 28, 2019 at 6:24 PM Piotr Nowojski <pi...@ververica.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Thanks for the proposal and driving the effort here Becket :) I’ve
>> read
>>>>>>> through the FLIP-33 [1], and here are couple of my thoughts.
>>>>>>> 
>>>>>>> Big +1 for standardising the metric names between connectors, it will
>>>>>>> definitely help us and users a lot.
>>>>>>> 
>>>>>>> Issues/questions/things to discuss that I’ve thought of:
>>>>>>> 
>>>>>>> 1a. If we are about to duplicate some metrics, can this become a
>>>>>>> performance issue?
>>>>>>> 1b. Generally speaking, we should make sure that collecting those
>>>> metrics
>>>>>>> is as non intrusive as possible, especially that they will need to be
>>>>>>> updated once per record. (They might be collected more rarely with
>> some
>>>>>>> overhead, but the hot path of updating it per record will need to be
>> as
>>>>>>> quick as possible). That includes both avoiding heavy computation on
>>>> per
>>>>>>> record path: histograms?, measuring time for time based metrics (per
>>>>>>> second) (System.currentTimeMillis() depending on the implementation
>> can
>>>>>>> invoke a system call)
>>>>>>> 
>>>>>>> 2. It would be nice to have metrics, that allow us to check the cause
>>>> of
>>>>>>> back pressure:
>>>>>>> a) for sources, length of input queue (in bytes? Or boolean
>>>>>>> hasSomethingl/isEmpty)
>>>>>>> b) for sinks, length of output queue (in bytes? Or boolean
>>>>>>> hasSomething/isEmpty)
>>>>>>> 
>>>>>>> a) is useful in a scenario when we are processing backlog of records,
>>>> all
>>>>>>> of the internal Flink’s input/output network buffers are empty, and
>> we
>>>> want
>>>>>>> to check whether the external source system is the bottleneck
>> (source’s
>>>>>>> input queue will be empty), or if the Flink’s connector is the
>>>> bottleneck
>>>>>>> (source’s input queues will be full).
>>>>>>> b) similar story. Backlog of records, but this time all of the
>> internal
>>>>>>> Flink’s input/ouput network buffers are full, and we want o check
>>>> whether
>>>>>>> the external sink system is the bottleneck (sink output queues are
>>>> full),
>>>>>>> or if the Flink’s connector is the bottleneck (sink’s output queues
>>>> will be
>>>>>>> empty)
>>>>>>> 
>>>>>>> It might be sometimes difficult to provide those metrics, so they
>> could
>>>>>>> be optional, but if we could provide them, it would be really
>> helpful.
>>>>>>> 
>>>>>>> Piotrek
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics
>>>>>>> <
>>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33:+Standardize+Connector+Metrics
>>>>>>>> 
>>>>>>> 
>>>>>>>> On 24 Apr 2019, at 13:28, Stephan Ewen <se...@apache.org> wrote:
>>>>>>>> 
>>>>>>>> I think this sounds reasonable.
>>>>>>>> 
>>>>>>>> Let's keep the "reconfiguration without stopping the job" out of
>> this,
>>>>>>>> because that would be a super big effort and if we approach that,
>> then
>>>>>>> in
>>>>>>>> more generic way rather than specific to connector metrics.
>>>>>>>> 
>>>>>>>> I would suggest to look at the following things before starting with
>>>> any
>>>>>>>> implementation work:
>>>>>>>> 
>>>>>>>> - Try and find a committer to support this, otherwise it will be
>> hard
>>>>>>> to
>>>>>>>> make progress
>>>>>>>> - Start with defining a smaller set of "core metrics" and extend the
>>>>>>> set
>>>>>>>> later. I think that is easier than now blocking on reaching
>> consensus
>>>>>>> on a
>>>>>>>> large group of metrics.
>>>>>>>> - Find a solution to the problem Chesnay mentioned, that the
>> "records
>>>>>>> in"
>>>>>>>> metric is somehow overloaded and exists already in the IO Metric
>>>> group.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mon, Mar 25, 2019 at 7:16 AM Becket Qin <becket....@gmail.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Stephan,
>>>>>>>>> 
>>>>>>>>> Thanks a lot for the feedback. All makes sense.
>>>>>>>>> 
>>>>>>>>> It is a good suggestion to simply have an onRecord(numBytes,
>>>> eventTime)
>>>>>>>>> method for connector writers. It should meet most of the
>>>> requirements,
>>>>>>>>> individual
>>>>>>>>> 
>>>>>>>>> The configurable metrics feature is something really useful,
>>>>>>> especially if
>>>>>>>>> we can somehow make it dynamically configurable without stopping
>> the
>>>>>>> jobs.
>>>>>>>>> It might be better to make it a separate discussion because it is a
>>>>>>> more
>>>>>>>>> generic feature instead of only for connectors.
>>>>>>>>> 
>>>>>>>>> So in order to make some progress, in this FLIP we can limit the
>>>>>>> discussion
>>>>>>>>> scope to the connector related items:
>>>>>>>>> 
>>>>>>>>> - the standard connector metric names and types.
>>>>>>>>> - the abstract ConnectorMetricHandler interface
>>>>>>>>> 
>>>>>>>>> I'll start a separate thread to discuss other general metric
>> related
>>>>>>>>> enhancement items including:
>>>>>>>>> 
>>>>>>>>> - optional metrics
>>>>>>>>> - dynamic metric configuration
>>>>>>>>> - potential combination with rate limiter
>>>>>>>>> 
>>>>>>>>> Does this plan sound reasonable?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> 
>>>>>>>>> On Sat, Mar 23, 2019 at 5:53 AM Stephan Ewen <se...@apache.org>
>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Ignoring for a moment implementation details, this connector
>> metrics
>>>>>>> work
>>>>>>>>>> is a really good thing to do, in my opinion
>>>>>>>>>> 
>>>>>>>>>> The questions "oh, my job seems to be doing nothing, I am looking
>> at
>>>>>>> the
>>>>>>>>> UI
>>>>>>>>>> and the 'records in' value is still zero" is in the top three
>>>> support
>>>>>>>>>> questions I have been asked personally.
>>>>>>>>>> Introspection into "how far is the consumer lagging behind" (event
>>>>>>> time
>>>>>>>>>> fetch latency) came up many times as well.
>>>>>>>>>> 
>>>>>>>>>> So big +1 to solving this problem.
>>>>>>>>>> 
>>>>>>>>>> About the exact design - I would try to go for the following
>>>>>>> properties:
>>>>>>>>>> 
>>>>>>>>>> - keep complexity of of connectors. Ideally the metrics handler
>> has
>>>> a
>>>>>>>>>> single onRecord(numBytes, eventTime) method or so, and everything
>>>>>>> else is
>>>>>>>>>> internal to the handler. That makes it dead simple for the
>>>> connector.
>>>>>>> We
>>>>>>>>>> can also think of an extensive scheme for connector specific
>>>> metrics.
>>>>>>>>>> 
>>>>>>>>>> - make it configurable on the job it cluster level which metrics
>> the
>>>>>>>>>> handler internally creates when that method is invoked.
>>>>>>>>>> 
>>>>>>>>>> What do you think?
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Stephan
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Mar 21, 2019 at 10:42 AM Chesnay Schepler <
>>>> ches...@apache.org
>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> As I said before, I believe this to be over-engineered and have
>> no
>>>>>>>>>>> interest in this implementation.
>>>>>>>>>>> 
>>>>>>>>>>> There are conceptual issues like defining a duplicate
>>>>>>>>> numBytesIn(PerSec)
>>>>>>>>>>> metric that already exists for each operator.
>>>>>>>>>>> 
>>>>>>>>>>> On 21.03.2019 06:13, Becket Qin wrote:
>>>>>>>>>>>> A few updates to the thread. I uploaded a patch[1] as a complete
>>>>>>>>>>>> example of how users can use the metrics in the future.
>>>>>>>>>>>> 
>>>>>>>>>>>> Some thoughts below after taking a look at the
>> AbstractMetricGroup
>>>>>>>>> and
>>>>>>>>>>>> its subclasses.
>>>>>>>>>>>> 
>>>>>>>>>>>> This patch intends to provide convenience for Flink connector
>>>>>>>>>>>> implementations to follow metrics standards proposed in FLIP-33.
>>>> It
>>>>>>>>>>>> also try to enhance the metric management in general way to help
>>>>>>>>> users
>>>>>>>>>>>> with:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. metric definition
>>>>>>>>>>>> 2. metric dependencies check
>>>>>>>>>>>> 3. metric validation
>>>>>>>>>>>> 4. metric control (turn on / off particular metrics)
>>>>>>>>>>>> 
>>>>>>>>>>>> This patch wraps |MetricGroup| to extend the functionality of
>>>>>>>>>>>> |AbstractMetricGroup| and its subclasses. The
>>>>>>>>>>>> |AbstractMetricGroup| mainly focus on the metric group
>> hierarchy,
>>>>>>> but
>>>>>>>>>>>> does not really manage the metrics other than keeping them in a
>>>> Map.
>>>>>>>>>>>> 
>>>>>>>>>>>> Ideally we should only have one entry point for the metrics.
>>>>>>>>>>>> 
>>>>>>>>>>>> Right now the entry point is |AbstractMetricGroup|. However,
>>>> besides
>>>>>>>>>>>> the missing functionality mentioned above, |AbstractMetricGroup|
>>>>>>>>> seems
>>>>>>>>>>>> deeply rooted in Flink runtime. We could extract it out to
>>>>>>>>>>>> flink-metrics in order to use it for generic purpose. There will
>>>> be
>>>>>>>>>>>> some work, though.
>>>>>>>>>>>> 
>>>>>>>>>>>> Another approach is to make |AbstractMetrics| in this patch as
>> the
>>>>>>>>>>>> metric entry point. It wraps metric group and provides the
>> missing
>>>>>>>>>>>> functionalities. Then we can roll out this pattern to runtime
>>>>>>>>>>>> components gradually as well.
>>>>>>>>>>>> 
>>>>>>>>>>>> My first thought is that the latter approach gives a more smooth
>>>>>>>>>>>> migration. But I am also OK with doing a refactoring on the
>>>>>>>>>>>> |AbstractMetricGroup| family.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> 
>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>> 
>>>>>>>>>>>> [1] https://github.com/becketqin/flink/pull/1
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, Feb 25, 2019 at 2:32 PM Becket Qin <
>> becket....@gmail.com
>>>>>>>>>>>> <mailto:becket....@gmail.com>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi Chesnay,
>>>>>>>>>>>> 
>>>>>>>>>>>> It might be easier to discuss some implementation details in
>> the
>>>>>>>>>>>> PR review instead of in the FLIP discussion thread. I have a
>>>>>>>>> patch
>>>>>>>>>>>> for Kafka connectors ready but haven't submitted the PR yet.
>>>>>>>>>>>> Hopefully that will help explain a bit more.
>>>>>>>>>>>> 
>>>>>>>>>>>> ** Re: metric type binding
>>>>>>>>>>>> This is a valid point that worths discussing. If I understand
>>>>>>>>>>>> correctly, there are two points:
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. Metric type / interface does not matter as long as the
>> metric
>>>>>>>>>>>> semantic is clearly defined.
>>>>>>>>>>>> Conceptually speaking, I agree that as long as the metric
>>>>>>>>> semantic
>>>>>>>>>>>> is defined, metric type does not matter. To some extent, Gauge
>> /
>>>>>>>>>>>> Counter / Meter / Histogram themselves can be think of as some
>>>>>>>>>>>> well-recognized semantics, if you wish. In Flink, these metric
>>>>>>>>>>>> semantics have their associated interface classes. In practice,
>>>>>>>>>>>> such semantic to interface binding seems necessary for
>> different
>>>>>>>>>>>> components to communicate.  Simply standardize the semantic of
>>>>>>>>> the
>>>>>>>>>>>> connector metrics seems not sufficient for people to build
>>>>>>>>>>>> ecosystem on top of. At the end of the day, we still need to
>>>> have
>>>>>>>>>>>> some embodiment of the metric semantics that people can program
>>>>>>>>>>>> against.
>>>>>>>>>>>> 
>>>>>>>>>>>> 2. Sometimes the same metric semantic can be exposed using
>>>>>>>>>>>> different metric types / interfaces.
>>>>>>>>>>>> This is a good point. Counter and Gauge-as-a-Counter are pretty
>>>>>>>>>>>> much interchangeable. This is more of a trade-off between the
>>>>>>>>> user
>>>>>>>>>>>> experience of metric producers and consumers. The metric
>>>>>>>>> producers
>>>>>>>>>>>> want to use Counter or Gauge depending on whether the counter
>> is
>>>>>>>>>>>> already tracked in code, while ideally the metric consumers
>> only
>>>>>>>>>>>> want to see a single metric type for each metric. I am leaning
>>>>>>>>>>>> towards to make the metric producers happy, i.e. allow Gauge /
>>>>>>>>>>>> Counter metric type, and the the metric consumers handle the
>>>> type
>>>>>>>>>>>> variation. The reason is that in practice, there might be more
>>>>>>>>>>>> connector implementations than metric reporter implementations.
>>>>>>>>> We
>>>>>>>>>>>> could also provide some helper method to facilitate reading
>> from
>>>>>>>>>>>> such variable metric type.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Just some quick replies to the comments around implementation
>>>>>>>>>>> details.
>>>>>>>>>>>> 
>>>>>>>>>>>>     4) single place where metrics are registered except
>>>>>>>>>>>>     connector-specific
>>>>>>>>>>>>     ones (which we can't really avoid).
>>>>>>>>>>>> 
>>>>>>>>>>>> Register connector specific ones in a single place is actually
>>>>>>>>>>>> something that I want to achieve.
>>>>>>>>>>>> 
>>>>>>>>>>>>     2) I'm talking about time-series databases like Prometheus.
>>>>>>>>> We
>>>>>>>>>>>>     would
>>>>>>>>>>>>     only have a gauge metric exposing the last
>>>> fetchTime/emitTime
>>>>>>>>>>>>     that is
>>>>>>>>>>>>     regularly reported to the backend (Prometheus), where a
>> user
>>>>>>>>>>>>     could build
>>>>>>>>>>>>     a histogram of his choosing when/if he wants it.
>>>>>>>>>>>> 
>>>>>>>>>>>> Not sure if such downsampling works. As an example, if a user
>>>>>>>>>>>> complains that there are some intermittent latency spikes
>> (maybe
>>>>>>>>> a
>>>>>>>>>>>> few records in 10 seconds) in their processing system. Having a
>>>>>>>>>>>> Gauge sampling instantaneous latency seems unlikely useful.
>>>>>>>>>>>> However by looking at actual 99.9 percentile latency might
>> help.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> 
>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Feb 22, 2019 at 9:30 PM Chesnay Schepler
>>>>>>>>>>>> <ches...@apache.org <mailto:ches...@apache.org>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>     Re: over complication of implementation.
>>>>>>>>>>>> 
>>>>>>>>>>>>     I think I get understand better know what you're shooting
>>>>>>>>> for,
>>>>>>>>>>>>     effectively something like the OperatorIOMetricGroup.
>>>>>>>>>>>>     But still, re-define setupConnectorMetrics() to accept a
>> set
>>>>>>>>>>>>     of flags
>>>>>>>>>>>>     for counters/meters(ans _possibly_ histograms) along with a
>>>>>>>>>>>>     set of
>>>>>>>>>>>>     well-defined Optional<Gauge<?>>, and return the group.
>>>>>>>>>>>> 
>>>>>>>>>>>>     Solves all issues as far as i can tell:
>>>>>>>>>>>>     1) no metrics must be created manually (except Gauges,
>> which
>>>>>>>>>> are
>>>>>>>>>>>>     effectively just Suppliers and you can't get around this),
>>>>>>>>>>>>     2) additional metrics can be registered on the returned
>>>>>>>>> group,
>>>>>>>>>>>>     3) see 1),
>>>>>>>>>>>>     4) single place where metrics are registered except
>>>>>>>>>>>>     connector-specific
>>>>>>>>>>>>     ones (which we can't really avoid).
>>>>>>>>>>>> 
>>>>>>>>>>>>     Re: Histogram
>>>>>>>>>>>> 
>>>>>>>>>>>>     1) As an example, whether "numRecordsIn" is exposed as a
>>>>>>>>>>>>     Counter or a
>>>>>>>>>>>>     Gauge should be irrelevant. So far we're using the metric
>>>>>>>>> type
>>>>>>>>>>>>     that is
>>>>>>>>>>>>     the most convenient at exposing a given value. If there is
>>>>>>>>>>>>     some backing
>>>>>>>>>>>>     data-structure that we want to expose some data from we
>>>>>>>>>>>>     typically opt
>>>>>>>>>>>>     for a Gauge, as otherwise we're just mucking around with
>> the
>>>>>>>>>>>>     Meter/Counter API to get it to match. Similarly, if we want
>>>>>>>>> to
>>>>>>>>>>>>     count
>>>>>>>>>>>>     something but no current count exists we typically added a
>>>>>>>>>>>>     Counter.
>>>>>>>>>>>>     That's why attaching semantics to metric types makes little
>>>>>>>>>>>>     sense (but
>>>>>>>>>>>>     unfortunately several reporters already do it); for
>>>>>>>>>>>>     counters/meters
>>>>>>>>>>>>     certainly, but the majority of metrics are gauges.
>>>>>>>>>>>> 
>>>>>>>>>>>>     2) I'm talking about time-series databases like Prometheus.
>>>>>>>>> We
>>>>>>>>>>>>     would
>>>>>>>>>>>>     only have a gauge metric exposing the last
>>>> fetchTime/emitTime
>>>>>>>>>>>>     that is
>>>>>>>>>>>>     regularly reported to the backend (Prometheus), where a
>> user
>>>>>>>>>>>>     could build
>>>>>>>>>>>>     a histogram of his choosing when/if he wants it.
>>>>>>>>>>>> 
>>>>>>>>>>>>     On 22.02.2019 13:57, Becket Qin wrote:
>>>>>>>>>>>>> Hi Chesnay,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for the explanation.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> ** Re: FLIP
>>>>>>>>>>>>> I might have misunderstood this, but it seems that "major
>>>>>>>>>>>>     changes" are well
>>>>>>>>>>>>> defined in FLIP. The full contents is following:
>>>>>>>>>>>>> What is considered a "major change" that needs a FLIP?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Any of the following should be considered a major change:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - Any major new feature, subsystem, or piece of
>>>>>>>>>>>>     functionality
>>>>>>>>>>>>> - *Any change that impacts the public interfaces of the
>>>>>>>>>>>>     project*
>>>>>>>>>>>>> 
>>>>>>>>>>>>> What are the "public interfaces" of the project?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> *All of the following are public interfaces *that people
>>>>>>>>>>>>     build around:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - DataStream and DataSet API, including classes related
>>>>>>>>>>>>     to that, such as
>>>>>>>>>>>>> StreamExecutionEnvironment
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - Classes marked with the @Public annotation
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - On-disk binary formats, such as
>>>>>>>>> checkpoints/savepoints
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - User-facing scripts/command-line tools, i.e.
>>>>>>>>>>>>     bin/flink, Yarn scripts,
>>>>>>>>>>>>> Mesos scripts
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - Configuration settings
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - *Exposed monitoring information*
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> So any monitoring information change is considered as
>>>>>>>>> public
>>>>>>>>>>>>     interface, and
>>>>>>>>>>>>> any public interface change is considered as a "major
>>>>>>>>>> change".
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> ** Re: over complication of implementation.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Although this is more of implementation details that is not
>>>>>>>>>>>>     covered by the
>>>>>>>>>>>>> FLIP. But it may be worth discussing.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> First of all, I completely agree that we should use the
>>>>>>>>>>>>     simplest way to
>>>>>>>>>>>>> achieve our goal. To me the goal is the following:
>>>>>>>>>>>>> 1. Clear connector conventions and interfaces.
>>>>>>>>>>>>> 2. The easiness of creating a connector.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Both of them are important to the prosperity of the
>>>>>>>>>>>>     connector ecosystem. So
>>>>>>>>>>>>> I'd rather abstract as much as possible on our side to make
>>>>>>>>>>>>     the connector
>>>>>>>>>>>>> developer's work lighter. Given this goal, a static util
>>>>>>>>>>>>     method approach
>>>>>>>>>>>>> might have a few drawbacks:
>>>>>>>>>>>>> 1. Users still have to construct the metrics by themselves.
>>>>>>>>>>>>     (And note that
>>>>>>>>>>>>> this might be erroneous by itself. For example, a customer
>>>>>>>>>>>>     wrapper around
>>>>>>>>>>>>> dropwizard meter maybe used instead of MeterView).
>>>>>>>>>>>>> 2. When connector specific metrics are added, it is
>>>>>>>>>>>>     difficult to enforce
>>>>>>>>>>>>> the scope to be the same as standard metrics.
>>>>>>>>>>>>> 3. It seems that a method proliferation is inevitable if we
>>>>>>>>>>>>     want to apply
>>>>>>>>>>>>> sanity checks. e.g. The metric of numBytesIn was not
>>>>>>>>>>>>     registered for a meter.
>>>>>>>>>>>>> 4. Metrics are still defined in random places and hard to
>>>>>>>>>>> track.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The current PR I had was inspired by the Config system in
>>>>>>>>>>>>     Kafka, which I
>>>>>>>>>>>>> found pretty handy. In fact it is not only used by Kafka
>>>>>>>>>>>>     itself but even
>>>>>>>>>>>>> some other projects that depend on Kafka. I am not saying
>>>>>>>>>>>>     this approach is
>>>>>>>>>>>>> perfect. But I think it worths to save the work for
>>>>>>>>>>>>     connector writers and
>>>>>>>>>>>>> encourage more systematic implementation. That being said,
>>>>>>>>> I
>>>>>>>>>>>>     am fully open
>>>>>>>>>>>>> to suggestions.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Re: Histogram
>>>>>>>>>>>>> I think there are two orthogonal questions around those
>>>>>>>>>>> metrics:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 1. Regardless of the metric type, by just looking at the
>>>>>>>>>>>>     meaning of a
>>>>>>>>>>>>> metric, is generic to all connectors? If the answer is yes,
>>>>>>>>>>>>     we should
>>>>>>>>>>>>> include the metric into the convention. No matter whether
>>>>>>>>> we
>>>>>>>>>>>>     include it
>>>>>>>>>>>>> into the convention or not, some connector implementations
>>>>>>>>>>>>     will emit such
>>>>>>>>>>>>> metric. It is better to have a convention than letting each
>>>>>>>>>>>>     connector do
>>>>>>>>>>>>> random things.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2. If a standard metric is a histogram, what should we do?
>>>>>>>>>>>>> I agree that we should make it clear that using histograms
>>>>>>>>>>>>     will have
>>>>>>>>>>>>> performance risk. But I do see histogram is useful in some
>>>>>>>>>>>>     fine-granularity
>>>>>>>>>>>>> debugging where one do not have the luxury to stop the
>>>>>>>>>>>>     system and inject
>>>>>>>>>>>>> more inspection code. So the workaround I am thinking is to
>>>>>>>>>>>>     provide some
>>>>>>>>>>>>> implementation suggestions. Assume later on we have a
>>>>>>>>>>>>     mechanism of
>>>>>>>>>>>>> selective metrics. In the abstract metrics class we can
>>>>>>>>>>>>     disable those
>>>>>>>>>>>>> metrics by default individual connector writers does not
>>>>>>>>>>>>     have to do
>>>>>>>>>>>>> anything (this is another advantage of having an
>>>>>>>>>>>>     AbstractMetrics instead of
>>>>>>>>>>>>> static util methods.)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I am not sure I fully understand the histogram in the
>>>>>>>>>>>>     backend approach. Can
>>>>>>>>>>>>> you explain a bit more? Do you mean emitting the raw data,
>>>>>>>>>>>>     e.g. fetchTime
>>>>>>>>>>>>> and emitTime with each record and let the histogram
>>>>>>>>>>>>     computation happen in
>>>>>>>>>>>>> the background? Or let the processing thread putting the
>>>>>>>>>>>>     values into a
>>>>>>>>>>>>> queue and have a separate thread polling from the queue and
>>>>>>>>>>>>     add them into
>>>>>>>>>>>>> the histogram?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Fri, Feb 22, 2019 at 4:34 PM Chesnay Schepler
>>>>>>>>>>>>     <ches...@apache.org <mailto:ches...@apache.org>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Re: Flip
>>>>>>>>>>>>>> The very first line under both the main header and Purpose
>>>>>>>>>>>>     section
>>>>>>>>>>>>>> describe Flips as "major changes", which this isn't.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Re: complication
>>>>>>>>>>>>>> I'm not arguing against standardization, but again an
>>>>>>>>>>>>     over-complicated
>>>>>>>>>>>>>> implementation when a static utility method would be
>>>>>>>>>>>>     sufficient.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> public static void setupConnectorMetrics(
>>>>>>>>>>>>>> MetricGroup operatorMetricGroup,
>>>>>>>>>>>>>> String connectorName,
>>>>>>>>>>>>>> Optional<Gauge<Long>> numRecordsIn,
>>>>>>>>>>>>>> ...)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> This gives you all you need:
>>>>>>>>>>>>>> * a well-defined set of metrics for a connector to opt-in
>>>>>>>>>>>>>> * standardized naming schemes for scope and individual
>>>>>>>>>> metrics
>>>>>>>>>>>>>> * standardize metric types (although personally I'm not
>>>>>>>>>>>>     interested in that
>>>>>>>>>>>>>> since metric types should be considered syntactic sugar)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Re: Configurable Histogram
>>>>>>>>>>>>>> If anything they _must_ be turned off by default, but the
>>>>>>>>>>>>     metric system is
>>>>>>>>>>>>>> already exposing so many options that I'm not too keen on
>>>>>>>>>>>>     adding even more.
>>>>>>>>>>>>>> You have also only addressed my first argument against
>>>>>>>>>>>>     histograms
>>>>>>>>>>>>>> (performance), the second one still stands (calculate
>>>>>>>>>>>>     histogram in metric
>>>>>>>>>>>>>> backends instead).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 21.02.2019 16:27, Becket Qin wrote:
>>>>>>>>>>>>>>> Hi Chesnay,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks for the comments. I think this is worthy of a FLIP
>>>>>>>>>>>>     because it is
>>>>>>>>>>>>>>> public API. According to the FLIP description a FlIP is
>>>>>>>>>>>>     required in case
>>>>>>>>>>>>>> of:
>>>>>>>>>>>>>>>  - Any change that impacts the public interfaces of
>>>>>>>>>>>>     the project
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> and the following entry is found in the definition of
>>>>>>>>>>>>     "public interface".
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>  - Exposed monitoring information
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Metrics are critical to any production system. So a clear
>>>>>>>>>>>>     metric
>>>>>>>>>>>>>> definition
>>>>>>>>>>>>>>> is important for any serious users. For an organization
>>>>>>>>>>>>     with large Flink
>>>>>>>>>>>>>>> installation, change in metrics means great amount of
>>>>>>>>>>>>     work. So such
>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>> do need to be fully discussed and documented.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> ** Re: Histogram.
>>>>>>>>>>>>>>> We can discuss whether there is a better way to expose
>>>>>>>>>>>>     metrics that are
>>>>>>>>>>>>>>> suitable for histograms. My micro-benchmark on various
>>>>>>>>>>>>     histogram
>>>>>>>>>>>>>>> implementations also indicates that they are
>>>>>>>>> significantly
>>>>>>>>>>>>     slower than
>>>>>>>>>>>>>>> other metric types. But I don't think that means never
>>>>>>>>> use
>>>>>>>>>>>>     histogram, but
>>>>>>>>>>>>>>> means use it with caution. For example, we can suggest
>>>>>>>>> the
>>>>>>>>>>>>>> implementations
>>>>>>>>>>>>>>> to turn them off by default and only turn it on for a
>>>>>>>>>>>>     small amount of
>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>> when performing some micro-debugging.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> ** Re: complication:
>>>>>>>>>>>>>>> Connector conventions are essential for Flink ecosystem.
>>>>>>>>>>>>     Flink connectors
>>>>>>>>>>>>>>> pool is probably the most important part of Flink, just
>>>>>>>>>>>>     like any other
>>>>>>>>>>>>>> data
>>>>>>>>>>>>>>> system. Clear conventions of connectors will help build
>>>>>>>>>>>>     Flink ecosystem
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>> a more organic way.
>>>>>>>>>>>>>>> Take the metrics convention as an example, imagine
>>>>>>>>> someone
>>>>>>>>>>>>     has developed
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> Flink connector for System foo, and another developer may
>>>>>>>>>>>>     have developed
>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>> monitoring and diagnostic framework for Flink which
>>>>>>>>>>>>     analyzes the Flink
>>>>>>>>>>>>>> job
>>>>>>>>>>>>>>> performance based on metrics. With a clear metric
>>>>>>>>>>>>     convention, those two
>>>>>>>>>>>>>>> projects could be developed independently. Once users put
>>>>>>>>>>>>     them together,
>>>>>>>>>>>>>>> it would work without additional modifications. This
>>>>>>>>>>>>     cannot be easily
>>>>>>>>>>>>>>> achieved by just defining a few constants.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> ** Re: selective metrics:
>>>>>>>>>>>>>>> Sure, we can discuss that in a separate thread.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> @Dawid
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> ** Re: latency / fetchedLatency
>>>>>>>>>>>>>>> The primary purpose of establish such a convention is to
>>>>>>>>>>>>     help developers
>>>>>>>>>>>>>>> write connectors in a more compatible way. The convention
>>>>>>>>>>>>     is supposed to
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> defined more proactively. So when look at the convention,
>>>>>>>>>>>>     it seems more
>>>>>>>>>>>>>>> important to see if the concept is applicable to
>>>>>>>>>>>>     connectors in general.
>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>> might be true so far only Kafka connector reports
>>>>>>>>> latency.
>>>>>>>>>>>>     But there
>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>> be hundreds of other connector implementations in the
>>>>>>>>>>>>     Flink ecosystem,
>>>>>>>>>>>>>>> though not in the Flink repo, and some of them also emits
>>>>>>>>>>>>     latency. I
>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> a lot of other sources actually also has an append
>>>>>>>>>>>>     timestamp. e.g.
>>>>>>>>>>>>>> database
>>>>>>>>>>>>>>> bin logs and some K-V stores. So I wouldn't be surprised
>>>>>>>>>>>>     if some database
>>>>>>>>>>>>>>> connector can also emit latency metrics.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 10:14 PM Chesnay Schepler
>>>>>>>>>>>>     <ches...@apache.org <mailto:ches...@apache.org>>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regarding 2) It doesn't make sense to investigate this
>>>>>>>>> as
>>>>>>>>>>>>     part of this
>>>>>>>>>>>>>>>> FLIP. This is something that could be of interest for
>>>>>>>>> the
>>>>>>>>>>>>     entire metric
>>>>>>>>>>>>>>>> system, and should be designed for as such.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Regarding the proposal as a whole:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Histogram metrics shall not be added to the core of
>>>>>>>>>>>>     Flink. They are
>>>>>>>>>>>>>>>> significantly more expensive than other metrics, and
>>>>>>>>>>>>     calculating
>>>>>>>>>>>>>>>> histograms in the application is regarded as an
>>>>>>>>>>>>     anti-pattern by several
>>>>>>>>>>>>>>>> metric backends, who instead recommend to expose the raw
>>>>>>>>>>>>     data and
>>>>>>>>>>>>>>>> calculate the histogram in the backend.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Second, this seems overly complicated. Given that we
>>>>>>>>>>>>     already established
>>>>>>>>>>>>>>>> that not all connectors will export all metrics we are
>>>>>>>>>>>>     effectively
>>>>>>>>>>>>>>>> reducing this down to a consistent naming scheme. We
>>>>>>>>>>>>     don't need anything
>>>>>>>>>>>>>>>> sophisticated for that; basically just a few constants
>>>>>>>>>>>>     that all
>>>>>>>>>>>>>>>> connectors use.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I'm not convinced that this is worthy of a FLIP.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On 21.02.2019 14:26, Dawid Wysakowicz wrote:
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Ad 1. In general I undestand and I agree. But those
>>>>>>>>>>>>     particular metrics
>>>>>>>>>>>>>>>>> (latency, fetchLatency), right now would only be
>>>>>>>>>>>>     reported if user uses
>>>>>>>>>>>>>>>>> KafkaConsumer with internal timestampAssigner with
>>>>>>>>>>>>     StreamCharacteristic
>>>>>>>>>>>>>>>>> set to EventTime, right? That sounds like a very
>>>>>>>>>>>>     specific case. I am
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> sure if we should introduce a generic metric that will
>>>>>>>>> be
>>>>>>>>>>>>>>>>> disabled/absent for most of implementations.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Ad.2 That sounds like an orthogonal issue, that might
>>>>>>>>>>>>     make sense to
>>>>>>>>>>>>>>>>> investigate in the future.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On 21/02/2019 13:20, Becket Qin wrote:
>>>>>>>>>>>>>>>>>> Hi Dawid,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks for the feedback. That makes sense to me. There
>>>>>>>>>>>>     are two cases
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> addressed.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 1. The metrics are supposed to be a guidance. It is
>>>>>>>>>>>>     likely that a
>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>> only supports some but not all of the metrics. In that
>>>>>>>>>>>>     case, each
>>>>>>>>>>>>>>>> connector
>>>>>>>>>>>>>>>>>> implementation should have the freedom to decide which
>>>>>>>>>>>>     metrics are
>>>>>>>>>>>>>>>>>> reported. For the metrics that are supported, the
>>>>>>>>>>>>     guidance should be
>>>>>>>>>>>>>>>>>> followed.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 2. Sometimes users may want to disable certain metrics
>>>>>>>>>>>>     for some reason
>>>>>>>>>>>>>>>>>> (e.g. performance / reprocessing of data). A generic
>>>>>>>>>>>>     mechanism should
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> provided to allow user choose which metrics are
>>>>>>>>>>>>     reported. This
>>>>>>>>>>>>>> mechanism
>>>>>>>>>>>>>>>>>> should also be honored by the connector
>>>>>>>>> implementations.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Does this sound reasonable to you?
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Thu, Feb 21, 2019 at 4:22 PM Dawid Wysakowicz <
>>>>>>>>>>>>>>>> dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Generally I like the idea of having a unified,
>>>>>>>>>>>>     standard set of
>>>>>>>>>>>>>> metrics
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> all connectors. I have some slight concerns about
>>>>>>>>>>>>     fetchLatency and
>>>>>>>>>>>>>>>>>>> latency though. They are computed based on EventTime
>>>>>>>>>>>>     which is not a
>>>>>>>>>>>>>>>> purely
>>>>>>>>>>>>>>>>>>> technical feature. It depends often on some business
>>>>>>>>>>>>     logic, might be
>>>>>>>>>>>>>>>> absent
>>>>>>>>>>>>>>>>>>> or defined after source. Those metrics could also
>>>>>>>>>>>>     behave in a weird
>>>>>>>>>>>>>>>> way in
>>>>>>>>>>>>>>>>>>> case of replaying backlog. Therefore I am not sure if
>>>>>>>>>>>>     we should
>>>>>>>>>>>>>> include
>>>>>>>>>>>>>>>>>>> those metrics by default. Maybe we could at least
>>>>>>>>>>>>     introduce a feature
>>>>>>>>>>>>>>>>>>> switch for them? What do you think?
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Dawid
>>>>>>>>>>>>>>>>>>> On 21/02/2019 03:13, Becket Qin wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Bump. If there is no objections to the proposed
>>>>>>>>>>>>     metrics. I'll start a
>>>>>>>>>>>>>>>>>>> voting thread later toady.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Mon, Feb 11, 2019 at 8:17 PM Becket Qin
>>>>>>>>>>>>     <becket....@gmail.com <mailto:becket....@gmail.com>> <
>>>>>>>>>>>>>>>> becket....@gmail.com <mailto:becket....@gmail.com>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I would like to start the FLIP discussion thread
>>>>>>>>> about
>>>>>>>>>>>>     standardize
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> connector metrics.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> In short, we would like to provide a convention of
>>>>>>>>>>>>     Flink connector
>>>>>>>>>>>>>>>>>>> metrics. It will help simplify the monitoring and
>>>>>>>>>>>>     alerting on Flink
>>>>>>>>>>>>>>>> jobs.
>>>>>>>>>>>>>>>>>>> The FLIP link is following:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>> 
>>>> 
>>>> 
>> 
>> 
>> 

Reply via email to