[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16325030#comment-16325030 ] Tzu-Li (Gordon) Tai commented on FLINK-8162: Merged for 1.5: 8e23264a4511d33723a756abc209c289fafbe97d. Thanks for the contribution [~casidiablo]! > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323953#comment-16323953 ] ASF GitHub Bot commented on FLINK-8162: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5182 > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323675#comment-16323675 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5182 Hi @casidiablo, just before merging, I made some refactoring of your PR to move the metrics reporting business out of the `ShardConsumer`: https://github.com/apache/flink/commit/14b01be8ec1c00b867aea8dc758c23fe247e05f4. Let me know if you have any objections with that, if not I'll proceed to merge soon. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323447#comment-16323447 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5182 Thanks @casidiablo. Merging ... > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321497#comment-16321497 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841610 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup registerMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") + .addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId()); --- End diff -- Great idea. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321495#comment-16321495 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841457 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Ah I see, that makes sense then. Could you then name it as `KinesisConsumer`, to be more consistent? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321494#comment-16321494 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160841254 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup registerMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") + .addGroup("shardId", shardState.getStreamShardHandle().getShard().getShardId()); --- End diff -- We probably should also add a `stream` group before `shardId`, since the Kinesis consumer allows subscribing to multiple streams. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320811#comment-16320811 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160763163 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- I think so. Just following the convention of other components (e.g. [kafka consumer](https://github.com/apache/flink/blob/00ad0eb120026817c79a5fed2c71dd6aa10e9ba6/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L101)) > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319865#comment-16319865 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160610484 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- is this necessary? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319866#comment-16319866 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160610346 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { --- End diff -- personal preference here: would prefer this to be named `registerMetricGroupForShard`. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316277#comment-16316277 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160139807 --- Diff: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java --- @@ -107,7 +107,7 @@ public MetricGroup addGroup(String key, String value) { @Override public Map getAllVariables() { - return Collections.emptyMap(); + return new HashMap<>(); --- End diff -- I think it was before (tests were failing because I was using `...getAllVariables().put("shard_id")...`). I will revert and test. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316212#comment-16316212 ] ASF GitHub Bot commented on FLINK-8162: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160132062 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java --- @@ -542,6 +545,16 @@ public int registerNewSubscribedShardState(KinesisStreamShardState newSubscribed } } + /** +* Registers a metric group associated with the shard id of the provided {@link KinesisStreamShardState shardState}. +*/ + private MetricGroup buildMetricGroupForShard(KinesisStreamShardState shardState) { + return runtimeContext + .getMetricGroup() + .addGroup("Kinesis") + .addGroup("shard_id", shardState.getStreamShardHandle().getShard().getShardId()); --- End diff -- Is the underscore important? Would prefer shardId for consistency. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316211#comment-16316211 ] ASF GitHub Bot commented on FLINK-8162: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160132296 --- Diff: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java --- @@ -107,7 +107,7 @@ public MetricGroup addGroup(String key, String value) { @Override public Map getAllVariables() { - return Collections.emptyMap(); + return new HashMap<>(); --- End diff -- is this change necessary? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316200#comment-16316200 ] ASF GitHub Bot commented on FLINK-8162: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r160131414 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -70,35 +71,45 @@ private Date initTimestamp; + private long millisBehindLatest; + /** * Creates a shard consumer. * * @param fetcherRef reference to the owning fetcher * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming +* @param kinesisMetricGroup the metric group to report to */ public ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum) { + SequenceNumber lastSequenceNum, + MetricGroup kinesisMetricGroup) { this(fetcherRef, subscribedShardStateIndex, subscribedShard, lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + KinesisProxy.create(fetcherRef.getConsumerConfiguration()), + kinesisMetricGroup); } /** This constructor is exposed for testing purposes. */ protected ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { + KinesisProxyInterface kinesis, + MetricGroup kinesisMetricGroup) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + + checkNotNull(kinesisMetricGroup) + .gauge("millisBehindLatest", () -> millisBehindLatest); --- End diff -- yes you can use lambdas. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313653#comment-16313653 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r159950881 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -70,35 +71,45 @@ private Date initTimestamp; + private long millisBehindLatest; + /** * Creates a shard consumer. * * @param fetcherRef reference to the owning fetcher * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming +* @param kinesisMetricGroup the metric group to report to */ public ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum) { + SequenceNumber lastSequenceNum, + MetricGroup kinesisMetricGroup) { this(fetcherRef, subscribedShardStateIndex, subscribedShard, lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + KinesisProxy.create(fetcherRef.getConsumerConfiguration()), + kinesisMetricGroup); } /** This constructor is exposed for testing purposes. */ protected ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { + KinesisProxyInterface kinesis, + MetricGroup kinesisMetricGroup) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + + checkNotNull(kinesisMetricGroup) + .gauge("millisBehindLatest", () -> millisBehindLatest); --- End diff -- Bump 👊 > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301479#comment-16301479 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158496021 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -70,35 +71,45 @@ private Date initTimestamp; + private long millisBehindLatest; + /** * Creates a shard consumer. * * @param fetcherRef reference to the owning fetcher * @param subscribedShardStateIndex the state index of the shard this consumer is subscribed to * @param subscribedShard the shard this consumer is subscribed to * @param lastSequenceNum the sequence number in the shard to start consuming +* @param kinesisMetricGroup the metric group to report to */ public ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, - SequenceNumber lastSequenceNum) { + SequenceNumber lastSequenceNum, + MetricGroup kinesisMetricGroup) { this(fetcherRef, subscribedShardStateIndex, subscribedShard, lastSequenceNum, - KinesisProxy.create(fetcherRef.getConsumerConfiguration())); + KinesisProxy.create(fetcherRef.getConsumerConfiguration()), + kinesisMetricGroup); } /** This constructor is exposed for testing purposes. */ protected ShardConsumer(KinesisDataFetcher fetcherRef, Integer subscribedShardStateIndex, StreamShardHandle subscribedShard, SequenceNumber lastSequenceNum, - KinesisProxyInterface kinesis) { + KinesisProxyInterface kinesis, + MetricGroup kinesisMetricGroup) { this.fetcherRef = checkNotNull(fetcherRef); this.subscribedShardStateIndex = checkNotNull(subscribedShardStateIndex); this.subscribedShard = checkNotNull(subscribedShard); this.lastSequenceNum = checkNotNull(lastSequenceNum); + + checkNotNull(kinesisMetricGroup) + .gauge("millisBehindLatest", () -> millisBehindLatest); --- End diff -- Will this work? Am I allowed to use lambdas? What Java version has to be supported? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299527#comment-16299527 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158194237 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() --- End diff -- I think we can't register the metric in `FlinkKinesisConsumer`, since we need it to be associated with a particular shard id. But I could do it from the `KinesisDataFetcher` instead, which already has access to the runtime context. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299514#comment-16299514 ] ASF GitHub Bot commented on FLINK-8162: --- Github user casidiablo commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158192923 --- Diff: docs/monitoring/metrics.md --- @@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier: + Kinesis Connectors + + + + Scope + Metrics + Description + Type + + + + + Operator + millisBehindLatest + The number of milliseconds the GetRecords response is from the tip of the stream, --- End diff -- I'm OK changing it. That's actually just a copy&past from Amazon docs. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299238#comment-16299238 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158161267 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Would be best if @zentol also comments on this. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299236#comment-16299236 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158159640 --- Diff: docs/monitoring/metrics.md --- @@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier: + Kinesis Connectors + + + + Scope + Metrics + Description + Type + + + + + Operator + millisBehindLatest + The number of milliseconds the GetRecords response is from the tip of the stream, --- End diff -- Just a matter of preference here: I prefer the term "head of the stream" instead of tip. You can ignore this if you disagree. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299240#comment-16299240 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158161564 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() --- End diff -- I feel that passing the `StreamingRuntimeContext` all the way here just to register metrics, is not a good idea. Is it possible we register the metrics in `FlinkKinesisConsumer` instead? That also makes it more visible what metrics the consumer exposes without having to dig all the way to this internal `ShardConsumer` thread. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299237#comment-16299237 ] ASF GitHub Bot commented on FLINK-8162: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r158160866 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis") --- End diff -- Do we really need this group? The metric is already bounded to the current subtask, which should provide enough context that it is Kinesis-related since we're the Kinesis consumer, no? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297661#comment-16297661 ] ASF GitHub Bot commented on FLINK-8162: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r157907259 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis"); + kinesisMetricGroup --- End diff -- use `addGroup("shard_id", subscribedShard.getShard().getShardId())` instead and register the metric on the returned group. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297662#comment-16297662 ] ASF GitHub Bot commented on FLINK-8162: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r157907331 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis"); + kinesisMetricGroup + .getAllVariables() + .put("", subscribedShard.getShard().getShardId()); + + kinesisMetricGroup.gauge("millisBehindLatest", (Gauge) () -> millisBehindLatest); --- End diff -- the cast shouldn't be necessary > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296973#comment-16296973 ] ASF GitHub Bot commented on FLINK-8162: --- GitHub user casidiablo opened a pull request: https://github.com/apache/flink/pull/5182 [FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehindLatest metric ## What is the purpose of the change - Emits [Kinesis' millisBehindLatest](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) metric, which can be used to detect delays in the pipeline ## Brief change log - Publish `millisBehindLatest` as a gauge metric under the `Kinesis` group using `` as parameter - Updated metrics documentation ## Verifying this change This change is already covered by existing tests, such as `ShardConsumerTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? `metrics.md` file You can merge this pull request into a Git repository by running: $ git pull https://github.com/casidiablo/flink kinesis-fork Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5182 commit bc8426ec3be364323d65cedcc1c1c5cb4e442c8b Author: Cristian Date: 2017-12-19T15:14:22Z Emit Kinesis' millisBehindLatest metric > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295892#comment-16295892 ] Cristian commented on FLINK-8162: - I will try to test and push a PR soon. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295843#comment-16295843 ] Furruska commented on FLINK-8162: - Can you share your fork with me? I'm interested in this but haven't been able to make it work. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290737#comment-16290737 ] Cristian commented on FLINK-8162: - I'm running a fork of the kinesis connector that implements this. Should I PR this? Or is this not part of the roadmap at all? > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273688#comment-16273688 ] Cristian commented on FLINK-8162: - Bump [~tzulitai] > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)