[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-13 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16325030#comment-16325030
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8162:


Merged for 1.5: 8e23264a4511d33723a756abc209c289fafbe97d.

Thanks for the contribution [~casidiablo]!

> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323953#comment-16323953
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5182


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323675#comment-16323675
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5182
  
Hi @casidiablo, just before merging, I made some refactoring of your PR to 
move the metrics reporting business out of the `ShardConsumer`: 
https://github.com/apache/flink/commit/14b01be8ec1c00b867aea8dc758c23fe247e05f4.

Let me know if you have any objections with that, if not I'll proceed to 
merge soon.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16323447#comment-16323447
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5182
  
Thanks @casidiablo. Merging ...


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321497#comment-16321497
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841610
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup registerMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
+   .addGroup("shardId", 
shardState.getStreamShardHandle().getShard().getShardId());
--- End diff --

Great idea.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321495#comment-16321495
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841457
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Ah I see, that makes sense then.

Could you then name it as `KinesisConsumer`, to be more consistent?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321494#comment-16321494
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160841254
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup registerMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
+   .addGroup("shardId", 
shardState.getStreamShardHandle().getShard().getShardId());
--- End diff --

We probably should also add a `stream` group before `shardId`, since the 
Kinesis consumer allows subscribing to multiple streams.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320811#comment-16320811
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160763163
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

I think so. Just following the convention of other components (e.g. [kafka 
consumer](https://github.com/apache/flink/blob/00ad0eb120026817c79a5fed2c71dd6aa10e9ba6/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L101))


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319865#comment-16319865
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160610484
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

is this necessary?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319866#comment-16319866
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160610346
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
--- End diff --

personal preference here: would prefer this to be named 
`registerMetricGroupForShard`.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316277#comment-16316277
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160139807
  
--- Diff: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 ---
@@ -107,7 +107,7 @@ public MetricGroup addGroup(String key, String value) {
 
@Override
public Map getAllVariables() {
-   return Collections.emptyMap();
+   return new HashMap<>();
--- End diff --

I think it was before (tests were failing because I was using 
`...getAllVariables().put("shard_id")...`). I will revert and test.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316212#comment-16316212
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160132062
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -542,6 +545,16 @@ public int 
registerNewSubscribedShardState(KinesisStreamShardState newSubscribed
}
}
 
+   /**
+* Registers a metric group associated with the shard id of the 
provided {@link KinesisStreamShardState shardState}.
+*/
+   private MetricGroup buildMetricGroupForShard(KinesisStreamShardState 
shardState) {
+   return runtimeContext
+   .getMetricGroup()
+   .addGroup("Kinesis")
+   .addGroup("shard_id", 
shardState.getStreamShardHandle().getShard().getShardId());
--- End diff --

Is the underscore important? Would prefer shardId for consistency.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316211#comment-16316211
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160132296
  
--- Diff: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 ---
@@ -107,7 +107,7 @@ public MetricGroup addGroup(String key, String value) {
 
@Override
public Map getAllVariables() {
-   return Collections.emptyMap();
+   return new HashMap<>();
--- End diff --

is this change necessary?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316200#comment-16316200
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r160131414
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -70,35 +71,45 @@
 
private Date initTimestamp;
 
+   private long millisBehindLatest;
+
/**
 * Creates a shard consumer.
 *
 * @param fetcherRef reference to the owning fetcher
 * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
 * @param subscribedShard the shard this consumer is subscribed to
 * @param lastSequenceNum the sequence number in the shard to start 
consuming
+* @param kinesisMetricGroup the metric group to report to
 */
public ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
-   SequenceNumber lastSequenceNum) 
{
+   SequenceNumber lastSequenceNum,
+   MetricGroup kinesisMetricGroup) 
{
this(fetcherRef,
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
-   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
+   kinesisMetricGroup);
}
 
/** This constructor is exposed for testing purposes. */
protected ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
SequenceNumber 
lastSequenceNum,
-   KinesisProxyInterface 
kinesis) {
+   KinesisProxyInterface 
kinesis,
+   MetricGroup 
kinesisMetricGroup) {
this.fetcherRef = checkNotNull(fetcherRef);
this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
+
+   checkNotNull(kinesisMetricGroup)
+   .gauge("millisBehindLatest", () -> millisBehindLatest);
--- End diff --

yes you can use lambdas.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313653#comment-16313653
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r159950881
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -70,35 +71,45 @@
 
private Date initTimestamp;
 
+   private long millisBehindLatest;
+
/**
 * Creates a shard consumer.
 *
 * @param fetcherRef reference to the owning fetcher
 * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
 * @param subscribedShard the shard this consumer is subscribed to
 * @param lastSequenceNum the sequence number in the shard to start 
consuming
+* @param kinesisMetricGroup the metric group to report to
 */
public ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
-   SequenceNumber lastSequenceNum) 
{
+   SequenceNumber lastSequenceNum,
+   MetricGroup kinesisMetricGroup) 
{
this(fetcherRef,
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
-   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
+   kinesisMetricGroup);
}
 
/** This constructor is exposed for testing purposes. */
protected ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
SequenceNumber 
lastSequenceNum,
-   KinesisProxyInterface 
kinesis) {
+   KinesisProxyInterface 
kinesis,
+   MetricGroup 
kinesisMetricGroup) {
this.fetcherRef = checkNotNull(fetcherRef);
this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
+
+   checkNotNull(kinesisMetricGroup)
+   .gauge("millisBehindLatest", () -> millisBehindLatest);
--- End diff --

Bump 👊 


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301479#comment-16301479
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158496021
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -70,35 +71,45 @@
 
private Date initTimestamp;
 
+   private long millisBehindLatest;
+
/**
 * Creates a shard consumer.
 *
 * @param fetcherRef reference to the owning fetcher
 * @param subscribedShardStateIndex the state index of the shard this 
consumer is subscribed to
 * @param subscribedShard the shard this consumer is subscribed to
 * @param lastSequenceNum the sequence number in the shard to start 
consuming
+* @param kinesisMetricGroup the metric group to report to
 */
public ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
-   SequenceNumber lastSequenceNum) 
{
+   SequenceNumber lastSequenceNum,
+   MetricGroup kinesisMetricGroup) 
{
this(fetcherRef,
subscribedShardStateIndex,
subscribedShard,
lastSequenceNum,
-   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()));
+   
KinesisProxy.create(fetcherRef.getConsumerConfiguration()),
+   kinesisMetricGroup);
}
 
/** This constructor is exposed for testing purposes. */
protected ShardConsumer(KinesisDataFetcher fetcherRef,
Integer 
subscribedShardStateIndex,
StreamShardHandle 
subscribedShard,
SequenceNumber 
lastSequenceNum,
-   KinesisProxyInterface 
kinesis) {
+   KinesisProxyInterface 
kinesis,
+   MetricGroup 
kinesisMetricGroup) {
this.fetcherRef = checkNotNull(fetcherRef);
this.subscribedShardStateIndex = 
checkNotNull(subscribedShardStateIndex);
this.subscribedShard = checkNotNull(subscribedShard);
this.lastSequenceNum = checkNotNull(lastSequenceNum);
+
+   checkNotNull(kinesisMetricGroup)
+   .gauge("millisBehindLatest", () -> millisBehindLatest);
--- End diff --

Will this work? Am I allowed to use lambdas? What Java version has to be 
supported?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299527#comment-16299527
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158194237
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
--- End diff --

I think we can't register the metric in `FlinkKinesisConsumer`, since we 
need it to be associated with a particular shard id. But I could do it from the 
`KinesisDataFetcher` instead, which already has access to the runtime context.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299514#comment-16299514
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158192923
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Kinesis Connectors
+
+  
+
+  Scope
+  Metrics
+  Description
+  Type
+
+  
+  
+
+  Operator
+  millisBehindLatest
+  The number of milliseconds the GetRecords response is 
from the tip of the stream,
--- End diff --

I'm OK changing it. That's actually just a copy&past from Amazon docs.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299238#comment-16299238
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158161267
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Would be best if @zentol also comments on this.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299236#comment-16299236
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158159640
  
--- Diff: docs/monitoring/metrics.md ---
@@ -1293,6 +1293,29 @@ Thus, in order to infer the metric identifier:
   
 
 
+ Kinesis Connectors
+
+  
+
+  Scope
+  Metrics
+  Description
+  Type
+
+  
+  
+
+  Operator
+  millisBehindLatest
+  The number of milliseconds the GetRecords response is 
from the tip of the stream,
--- End diff --

Just a matter of preference here: I prefer the term "head of the stream" 
instead of tip.
You can ignore this if you disagree.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299240#comment-16299240
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158161564
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
--- End diff --

I feel that passing the `StreamingRuntimeContext` all the way here just to 
register metrics, is not a good idea.
Is it possible we register the metrics in `FlinkKinesisConsumer` instead? 
That also makes it more visible what metrics the consumer exposes without 
having to dig all the way to this internal `ShardConsumer` thread.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16299237#comment-16299237
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r158160866
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +99,15 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis")
--- End diff --

Do we really need this group? The metric is already bounded to the current 
subtask, which should provide enough context that it is Kinesis-related since 
we're the Kinesis consumer, no?


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297661#comment-16297661
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r157907259
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis");
+   kinesisMetricGroup
--- End diff --

use `addGroup("shard_id", subscribedShard.getShard().getShardId())` instead 
and register the metric on the returned group.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297662#comment-16297662
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r157907331
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis");
+   kinesisMetricGroup
+   .getAllVariables()
+   .put("", 
subscribedShard.getShard().getShardId());
+
+   kinesisMetricGroup.gauge("millisBehindLatest", (Gauge) () 
-> millisBehindLatest);
--- End diff --

the cast shouldn't be necessary


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16296973#comment-16296973
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

GitHub user casidiablo opened a pull request:

https://github.com/apache/flink/pull/5182

[FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehindLatest metric

## What is the purpose of the change

- Emits [Kinesis' 
millisBehindLatest](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)
 metric, which can be used to detect delays in the pipeline

## Brief change log

  - Publish `millisBehindLatest` as a gauge metric under the `Kinesis` 
group using `` as parameter
  - Updated metrics documentation


## Verifying this change

This change is already covered by existing tests, such as 
`ShardConsumerTest`.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? `metrics.md` file


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/casidiablo/flink kinesis-fork

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5182


commit bc8426ec3be364323d65cedcc1c1c5cb4e442c8b
Author: Cristian 
Date:   2017-12-19T15:14:22Z

Emit Kinesis' millisBehindLatest metric




> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-18 Thread Cristian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295892#comment-16295892
 ] 

Cristian commented on FLINK-8162:
-

I will try to test and push a PR soon.

> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-18 Thread Furruska (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16295843#comment-16295843
 ] 

Furruska commented on FLINK-8162:
-

Can you share your fork with me? I'm interested in this but haven't been able 
to make it work.

> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-14 Thread Cristian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290737#comment-16290737
 ] 

Cristian commented on FLINK-8162:
-

I'm running a fork of the kinesis connector that implements this. Should I PR 
this? Or is this not part of the roadmap at all?

> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-11-30 Thread Cristian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16273688#comment-16273688
 ] 

Cristian commented on FLINK-8162:
-

Bump [~tzulitai]

> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)