Hi everyone,
The source watermark metrics show the consumer latency of Source.
It allows the user to know the health of the job, or it can be used to
monitor and alarm.
We should have the runner report the watermark metricsrather than
having the source report it using metrics. This addresses the fact that even
if the source has advanced to 8:00, the runner may still know about buffered
elements at 7:00, and so not advance the watermark all the way to 8:00.
The metrics Includes:
1.Source watermark (`min` amongst all splits):
type = Gauge, namespace = io, name = source_watermark
2.Source watermark per split:
type = Gauge, namespace = io.splits, name = <split_id>.source_watermark
Min Source watermark amongst all splits seems difficult to implement since
some runners(like FlinkRunner) can't access to all the splits to aggregate
and there is no such AggregatorMetric.
So We could report watermark per split and users could use a `min`
aggregation on this in their metrics backends. However, as was mentioned
in the IO metrics proposal by several people this could be problematic in
sources with many splits.
So we do a check when report metrics to solve the problem of too many splits.
{code}
if (splitsNum <= METRIC_MAX_SPLITS) {
// set the sourceWatermarkOfSplit
}
{code}
So I'd like to take a discussion to the implement of source watermark metrics
and specific how many splits is too many. (the value of METRIC_MAX_SPLITS)
JIRA:
IO metrics (https://issues.apache.org/jira/browse/BEAM-1919)
Source watermark (https://issues.apache.org/jira/browse/BEAM-1941)