[jira] [Commented] (FLINK-2144) Incremental count, average, and variance for windows

2016-06-29 Thread Wenlong Lyu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354715#comment-15354715
 ] 

Wenlong Lyu commented on FLINK-2144:


our solution is mostly based on this paper: 
http://www.vldb.org/pvldb/vol8/p702-tangwongsan.pdf

> Incremental count, average, and variance for windows
> 
>
> Key: FLINK-2144
> URL: https://issues.apache.org/jira/browse/FLINK-2144
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: statistics
>
> By count I mean the number of elements in the window.
> These can be implemented very efficiently building on FLINK-2143:
> Store: O(1)
> Evict: O(1)
> emitWindow: O(1)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2150: [FLINK-3580] [table] Reintroduce Date/Time and implement ...

2016-06-29 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2150
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3580) Reintroduce Date/Time and implement scalar functions for it

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354732#comment-15354732
 ] 

ASF GitHub Bot commented on FLINK-3580:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/2150
  
Merging...


> Reintroduce Date/Time and implement scalar functions for it
> ---
>
> Key: FLINK-3580
> URL: https://issues.apache.org/jira/browse/FLINK-3580
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> This task includes:
> {code}
> DATETIME_PLUS
> EXTRACT_DATE
> FLOOR
> CEIL
> CURRENT_TIME
> CURRENT_TIMESTAMP
> LOCALTIME
> LOCALTIMESTAMP
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2150: [FLINK-3580] [table] Reintroduce Date/Time and imp...

2016-06-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2150


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3580) Reintroduce Date/Time and implement scalar functions for it

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354740#comment-15354740
 ] 

ASF GitHub Bot commented on FLINK-3580:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2150


> Reintroduce Date/Time and implement scalar functions for it
> ---
>
> Key: FLINK-3580
> URL: https://issues.apache.org/jira/browse/FLINK-3580
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> This task includes:
> {code}
> DATETIME_PLUS
> EXTRACT_DATE
> FLOOR
> CEIL
> CURRENT_TIME
> CURRENT_TIMESTAMP
> LOCALTIME
> LOCALTIMESTAMP
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-3152) Support all comparisons for Date type

2016-06-29 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-3152:
---

Assignee: Timo Walther

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3152) Support all comparisons for Date type

2016-06-29 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-3152.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed by FLINK-3580.

> Support all comparisons for Date type
> -
>
> Key: FLINK-3152
> URL: https://issues.apache.org/jira/browse/FLINK-3152
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
> Fix For: 1.1.0
>
>
> Currently, the Table API does not support comparisons like "DATE < DATE", 
> "DATE >= DATE". The ExpressionCodeGenerator needs to be adapted for that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4130) CallGenerator could generate illegal code when taking no operands

2016-06-29 Thread Cody (JIRA)
Cody created FLINK-4130:
---

 Summary: CallGenerator could generate illegal code when taking no 
operands
 Key: FLINK-4130
 URL: https://issues.apache.org/jira/browse/FLINK-4130
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.1.0
Reporter: Cody
Priority: Minor


In CallGenerator, when a call takes no operands, and null check is enabled, it 
will generate code like:

boolean isNull$17 = ;

which will fail to compile at runtime.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-...

2016-06-29 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2180

[FLINK-4080][kinesis-connector] Guarantee exactly-once for Kinesis consumer 
when fail in middle of aggregated records

If multiple Kinesis records were aggregated into a single record by KPL, 
when deaggregated at the consumer, all deaggregated subrecords will have the 
same sequence number. This breaks the exactly-once guarantee of the 
`FlinkKinesisConsumer` if it happens to fail while we are still in the middle 
of processing a deaggregated records (the first record's sequence number will 
incorrectly mark the whole batch of aggregated records as processed).

To fix this, this PR changes the snapshot state type of 
`FlinkKinesisConsumer` from `HashMap` to 
`HashMap`.
`SequenceNumber` is a new class to represent a combination of a "main 
sequence number" and a "subsequence number". When the `ShardConsumerThread` 
starts consuming records, we check if the last record after restore was a 
aggregated record. If yes, we first handle the dangling subrecords.

@rmetzger I'm adding this change to the original Kinesis connector in 
`master` instead of waiting for the big PR #2131 to be merged, because I think 
this is a bug we must fix before 1.1, and I'm not sure if #2131 will be merged 
before the RC for 1.1 comes out. Depending on whether #2131 or this PR gets 
merged first, I'll rebase the other one accordingly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4080

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2180


commit a9593677a4c73c9475b1e85204002d6470f2115a
Author: Gordon Tai 
Date:   2016-06-29T07:46:35Z

[FLINK-4080] Guarantee exactly-once for Kinesis consumer for failures in 
the middle of aggregated records




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-once fo...

2016-06-29 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2180
  
Thanks a lot for fixing this. I'll try to review it in the next few hours.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354779#comment-15354779
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2180

[FLINK-4080][kinesis-connector] Guarantee exactly-once for Kinesis consumer 
when fail in middle of aggregated records

If multiple Kinesis records were aggregated into a single record by KPL, 
when deaggregated at the consumer, all deaggregated subrecords will have the 
same sequence number. This breaks the exactly-once guarantee of the 
`FlinkKinesisConsumer` if it happens to fail while we are still in the middle 
of processing a deaggregated records (the first record's sequence number will 
incorrectly mark the whole batch of aggregated records as processed).

To fix this, this PR changes the snapshot state type of 
`FlinkKinesisConsumer` from `HashMap` to 
`HashMap`.
`SequenceNumber` is a new class to represent a combination of a "main 
sequence number" and a "subsequence number". When the `ShardConsumerThread` 
starts consuming records, we check if the last record after restore was a 
aggregated record. If yes, we first handle the dangling subrecords.

@rmetzger I'm adding this change to the original Kinesis connector in 
`master` instead of waiting for the big PR #2131 to be merged, because I think 
this is a bug we must fix before 1.1, and I'm not sure if #2131 will be merged 
before the RC for 1.1 comes out. Depending on whether #2131 or this PR gets 
merged first, I'll rebase the other one accordingly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4080

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2180


commit a9593677a4c73c9475b1e85204002d6470f2115a
Author: Gordon Tai 
Date:   2016-06-29T07:46:35Z

[FLINK-4080] Guarantee exactly-once for Kinesis consumer for failures in 
the middle of aggregated records




> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354781#comment-15354781
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2180
  
Thanks a lot for fixing this. I'll try to review it in the next few hours.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68904419
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

That's a very good point, Greg. I think we didn't. As it stands, the large 
record handler seems to have known issues. Personally, I would therefore 
discourage people from using it, but it totally makes sense to add a paragraph 
about what it actually does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354782#comment-15354782
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68904419
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

That's a very good point, Greg. I think we didn't. As it stands, the large 
record handler seems to have known issues. Personally, I would therefore 
discourage people from using it, but it totally makes sense to add a paragraph 
about what it actually does.


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger reassigned FLINK-4127:
-

Assignee: Robert Metzger

> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger updated FLINK-4127:
--
Component/s: Build System

> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68904562
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a `Counter` like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own `Counter` 
implementation along with the name.
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter()` call. The order is always 
\\\.
+
+The system scope allows the reported name to contain contextual 
information like the name of job it was registered in without requiring the 
user to pass this information manually.
+
+How the system scope affects the reported name for a metric can be 
modified by setting the following keys in the flink-conf.yaml. 
+Each of these keys expect a format string that may contain constants (e.g. 
"taskmanager") and variables (e.g. "\") which will be replaced at 
runtime.
+
+- `metrics.scope.jm`
+  - Default: \.jobmanager
+- `metrics.scope.jm.job`
+  - Default: \.jobmanager.\
+- `metrics.scope.tm`
+  - Default: \.taskmanager.\
+- `metrics.scope.tm.job`
+  - Default: \.taskmanager.\.\
+- `metrics.scope.task`
+  - Default: 
\.taskmanager.\.\.\.\
+- `metrics.scope.operator`
+  - Default: 
\.taskmanager.\.\.\.\
+
+The following is a list of all variables available:
+
+- JobManager: \
+- TaskManager: \, \
+- Job: \, \
+- Task: \, \, \, \, 
\
+- Operator: \, \
+
+There is no restriction on the order of variables.
+
+Which format is applied to a metric depends on which entities it was 
scoped to.
+The `metrics.scope.operator` format will be applied to all metrics that 
were scoped to an operator.
+The `metrics.scope.tm.job` format will be applied to all metrics that were 
only scoped to a job and a taskmanager.
+
+This allows you to define explicitly how metrics on ev

[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354784#comment-15354784
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68904562
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows gathering and exposing metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling `getRuntimeContext().getMetricGroup()`.
+This method returns a `MetricGroup` object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a `Counter` like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports `Counters`, `Gauges` and `Histograms`.
+
+A `Counter` is used to count something. The current value can be in- or 
decremented using `inc()/inc(long n)` or `dec()/dec(long n)`.
+You can create and register a `Counter` by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own `Counter` 
implementation along with the name.
+
+A `Gauge` provides a value of any type on demand. In order to use a 
`Gauge` you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represents the entities it is tied to. By default a metric that is registered 
in a user function will be scoped to the operator in which the function runs, 
the task/job it belongs to and the taskManager/host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the either 
`MetricGroup#addGroup(String name)` or `MetricGroup#addGroup(int name)`.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter()` call. The order is always 
\\\.
+
+The system scope allows the reported name to contain contextual 
information like the name of job it was registered in without requiring the 
user to pass this information manually.
+
+How the system scope affects the reported name for a metric can be 
modified by setting the following keys in the flink-conf.yaml. 
+Each of these keys expect a format string that may contain constants (e.g. 
"taskmanager") and variables (e.g. "\") which will be replaced at 
runtime.
+
+- `metrics.scope.jm`
+  - Default: \.jobmanager
+- `metrics.scope.jm.job`
+  - Default: \.jobmanager.\
+- `metrics.scope.tm`
+  - Default: \.taskmanager.\
+- `metrics.scope.tm.job`
+  - Default: \.taskmanager.\.\
+- `metrics.scope.task`
+  - Default: 
\.taskmanager.\.\.\.\
+- `metrics.scope.operator`
+  - Default: 
\.taskmanager.\.\.\.\
+
+The following is a list of all variables available:
+
+- JobManager: \
+- TaskManager: \, \
+- Job: \, \
+- Task: \, \, \, \, 
\
+- Operator: \, \
+
+There is no restriction on the order of variables.
+
+Which format is applied to a metric depends on which entities it was 
scoped to.
+The `metrics.scope.operator` fo

[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68904915
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows users gather and expose metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling getRuntimeContext().getMetricGroup().
+This method returns a MetricGroup object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a Counter like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports Counters, Gauges and Histograms.
+
+A Counter is used to count something. The current value can be in- or 
decremented using `inc([long n])` or `dec([long n])`.
+You can create and registers a Counter by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own Counter 
implementation along with the name.
+
+A Gauge provides a value of any type on demand. In order to use a Gauge 
you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represent the entities it is tied to. By default a metric that is registered in 
a user function will be scoped to the Operator in which the function runs, the 
Task/Job it belongs to and the TaskManager/Host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the 
MetricGroup#addGroup((int/String) name) method.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter() call. The order is always 
\\\.
+
+The system scope allows the reported name to contain contextual 
information like the name of job it was registered in without requiring the 
user to pass this information manually.
+
+How the system scope affects the reported name for a metric can be 
modified by setting the following keys in the flink-conf.yaml. 
+Each of these keys expect a format string that may contain constants (e.g. 
"taskmanager") and variables (e.g. "\") which will be replaced at 
runtime.
--- End diff --

I agree that we should mirror the config keys to the configuration page as 
well + move the config keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-4033) Missing Scala example snippets for the Kinesis Connector documentation

2016-06-29 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-4033:
--

Assignee: Tzu-Li (Gordon) Tai

> Missing Scala example snippets for the Kinesis Connector documentation
> --
>
> Key: FLINK-4033
> URL: https://issues.apache.org/jira/browse/FLINK-4033
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.1.0
>
>
> The documentation for the Kinesis connector is missing Scala version of the 
> example snippets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354789#comment-15354789
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68904915
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows users gather and expose metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling getRuntimeContext().getMetricGroup().
+This method returns a MetricGroup object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a Counter like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports Counters, Gauges and Histograms.
+
+A Counter is used to count something. The current value can be in- or 
decremented using `inc([long n])` or `dec([long n])`.
+You can create and registers a Counter by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own Counter 
implementation along with the name.
+
+A Gauge provides a value of any type on demand. In order to use a Gauge 
you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represent the entities it is tied to. By default a metric that is registered in 
a user function will be scoped to the Operator in which the function runs, the 
Task/Job it belongs to and the TaskManager/Host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the 
MetricGroup#addGroup((int/String) name) method.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter() call. The order is always 
\\\.
+
+The system scope allows the reported name to contain contextual 
information like the name of job it was registered in without requiring the 
user to pass this information manually.
+
+How the system scope affects the reported name for a metric can be 
modified by setting the following keys in the flink-conf.yaml. 
+Each of these keys expect a format string that may contain constants (e.g. 
"taskmanager") and variables (e.g. "\") which will be replaced at 
runtime.
--- End diff --

I agree that we should mirror the config keys to the configuration page as 
well + move the config keys.


> Document metrics
> 
>
> Key: FLINK-4116
> URL: https://issues.apache.org/jira/browse/FLINK-4116
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> The metric system is currently not documented, which should be fixed before 
> the 1.1 release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-...

2016-06-29 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68905535
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

This file is missing the license header


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905457
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
--- End diff --

IMHO the `containered` prefix reflects the meaning of this config key 
better than `container`. It cuts off a fraction from the containerized Flink 
process, not from the container itself which the latter would imply.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354800#comment-15354800
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905457
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
--- End diff --

IMHO the `containered` prefix reflects the meaning of this config key 
better than `container`. It cuts off a fraction from the containerized Flink 
process, not from the container itself which the latter would imply.


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905471
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
 
/**
 * Minimum amount of heap memory to remove in containers, as a safety 
margin.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_MIN = 
"containered.heap-cutoff-min";
+   public static final String CONTAINER_HEAP_CUTOFF_MIN = 
"container.heap-cutoff-min";
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354801#comment-15354801
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68905535
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

This file is missing the license header


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354802#comment-15354802
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905471
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
 
/**
 * Minimum amount of heap memory to remove in containers, as a safety 
margin.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_MIN = 
"containered.heap-cutoff-min";
+   public static final String CONTAINER_HEAP_CUTOFF_MIN = 
"container.heap-cutoff-min";
--- End diff --

Same as above.


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905882
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
 
/**
 * Minimum amount of heap memory to remove in containers, as a safety 
margin.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_MIN = 
"containered.heap-cutoff-min";
+   public static final String CONTAINER_HEAP_CUTOFF_MIN = 
"container.heap-cutoff-min";
 
/**
 * Prefix for passing custom environment variables to Flink's master 
process.
 * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
 * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 * in the flink-conf.yaml.
 */
-   public static final String CONTAINERED_MASTER_ENV_PREFIX = 
"containered.application-master.env.";
+   public static final String CONTAINER_MASTER_ENV_PREFIX = 
"container.application-master.env.";
 
/**
-* Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this 
configuration prefix allows
+* Similar to the {@see CONTAINER_MASTER_ENV_PREFIX}, this 
configuration prefix allows
 * setting custom environment variables for the workers (TaskManagers)
 */
-   public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = 
"containered.taskmanager.env.";
+   public static final String CONTAINER_TASK_MANAGER_ENV_PREFIX = 
"container.taskmanager.env.";
--- End diff --

How about we rename the config string to `container.worker.env`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905853
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
 
/**
 * Minimum amount of heap memory to remove in containers, as a safety 
margin.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_MIN = 
"containered.heap-cutoff-min";
+   public static final String CONTAINER_HEAP_CUTOFF_MIN = 
"container.heap-cutoff-min";
 
/**
 * Prefix for passing custom environment variables to Flink's master 
process.
 * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
 * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 * in the flink-conf.yaml.
 */
-   public static final String CONTAINERED_MASTER_ENV_PREFIX = 
"containered.application-master.env.";
+   public static final String CONTAINER_MASTER_ENV_PREFIX = 
"container.application-master.env.";
--- End diff --

How about we rename the config string to `container.master.env`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354806#comment-15354806
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905882
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
 
/**
 * Minimum amount of heap memory to remove in containers, as a safety 
margin.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_MIN = 
"containered.heap-cutoff-min";
+   public static final String CONTAINER_HEAP_CUTOFF_MIN = 
"container.heap-cutoff-min";
 
/**
 * Prefix for passing custom environment variables to Flink's master 
process.
 * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
 * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 * in the flink-conf.yaml.
 */
-   public static final String CONTAINERED_MASTER_ENV_PREFIX = 
"containered.application-master.env.";
+   public static final String CONTAINER_MASTER_ENV_PREFIX = 
"container.application-master.env.";
 
/**
-* Similar to the {@see CONTAINERED_MASTER_ENV_PREFIX}, this 
configuration prefix allows
+* Similar to the {@see CONTAINER_MASTER_ENV_PREFIX}, this 
configuration prefix allows
 * setting custom environment variables for the workers (TaskManagers)
 */
-   public static final String CONTAINERED_TASK_MANAGER_ENV_PREFIX = 
"containered.taskmanager.env.";
+   public static final String CONTAINER_TASK_MANAGER_ENV_PREFIX = 
"container.taskmanager.env.";
--- End diff --

How about we rename the config string to `container.worker.env`?


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2177: [FLINK-4127] Check API compatbility for 1.1 in flink-core

2016-06-29 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2177
  
Thanks for updating the documentation! I've made some suggestions regarding 
the names of the new configuration keys.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-...

2016-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68906196
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

Sorry, fixed. I wonder why I don't get the Maven error when I build locally?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354805#comment-15354805
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68905853
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -269,26 +269,26 @@
 * Percentage of heap space to remove from containers (YARN / Mesos), 
to compensate
 * for other JVM memory usage.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_RATIO = 
"containered.heap-cutoff-ratio";
+   public static final String CONTAINER_HEAP_CUTOFF_RATIO = 
"container.heap-cutoff-ratio";
 
/**
 * Minimum amount of heap memory to remove in containers, as a safety 
margin.
 */
-   public static final String CONTAINERED_HEAP_CUTOFF_MIN = 
"containered.heap-cutoff-min";
+   public static final String CONTAINER_HEAP_CUTOFF_MIN = 
"container.heap-cutoff-min";
 
/**
 * Prefix for passing custom environment variables to Flink's master 
process.
 * For example for passing LD_LIBRARY_PATH as an env variable to the 
AppMaster, set:
 * yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 * in the flink-conf.yaml.
 */
-   public static final String CONTAINERED_MASTER_ENV_PREFIX = 
"containered.application-master.env.";
+   public static final String CONTAINER_MASTER_ENV_PREFIX = 
"container.application-master.env.";
--- End diff --

How about we rename the config string to `container.master.env`?


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354810#comment-15354810
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2177
  
Thanks for updating the documentation! I've made some suggestions regarding 
the names of the new configuration keys.


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354808#comment-15354808
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68906196
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

Sorry, fixed. I wonder why I don't get the Maven error when I build locally?


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68906518
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

@aljoscha, can you explain what the large record handler is doing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-...

2016-06-29 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68906667
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

Did you do "mvn verify" or another phase? I think the plugin is executed on 
verify


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354814#comment-15354814
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68906667
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

Did you do "mvn verify" or another phase? I think the plugin is executed on 
verify


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4126) Unstable test ZooKeeperLeaderElectionTest.testZooKeeperReelection

2016-06-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4126.

Resolution: Fixed

Fixed via dbe41f48745bf52458291290bd2aa0fdc8d18560

> Unstable test ZooKeeperLeaderElectionTest.testZooKeeperReelection
> -
>
> Key: FLINK-4126
> URL: https://issues.apache.org/jira/browse/FLINK-4126
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> Here is an instance of the failure:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558301/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4126) Unstable test ZooKeeperLeaderElectionTest.testZooKeeperReelection

2016-06-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-4126:


Assignee: Till Rohrmann

> Unstable test ZooKeeperLeaderElectionTest.testZooKeeperReelection
> -
>
> Key: FLINK-4126
> URL: https://issues.apache.org/jira/browse/FLINK-4126
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: test-stability
>
> Here is an instance of the failure:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558301/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4126) Unstable test ZooKeeperLeaderElectionTest.testZooKeeperReelection

2016-06-29 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354812#comment-15354812
 ] 

Till Rohrmann commented on FLINK-4126:
--

The problem is that the test case times out. Increasing the timeout and 
decreasing the number of reelections should harden the test case.

> Unstable test ZooKeeperLeaderElectionTest.testZooKeeperReelection
> -
>
> Key: FLINK-4126
> URL: https://issues.apache.org/jira/browse/FLINK-4126
> Project: Flink
>  Issue Type: Bug
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Here is an instance of the failure:
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/140558301/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354813#comment-15354813
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68906518
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

@aljoscha, can you explain what the large record handler is doing?


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68907744
  
--- Diff: docs/setup/config.md ---
@@ -107,12 +109,20 @@ Please make sure to set the maximum ticket life span 
high long running jobs. The
 
 If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
 
+### Resource Manager
+
+- `resourcemanager.rpc.port`: The config parameter defining the network 
port to connect to for communication with the resource manager.
+
 ### Other
 
 - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of 
directories separated by the systems directory delimiter (for example ':' 
(colon) on Linux/Unix). If multiple directories are specified, then the 
temporary files will be distributed across the directories in a round-robin 
fashion. The I/O manager component will spawn one reading and one writing 
thread per directory. A directory may be listed multiple times to have the I/O 
manager use multiple threads for it (for example if it is physically stored on 
a very fast disc or RAID) (DEFAULT: The system's tmp dir).
 
+- `taskmanager.log.path`: The config parameter defining the taskmanager 
log file location
+
 - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 
8081).
 
+- `jobmanager.web.tmpdir`: This configuration parameter allows defining 
the Flink web directory to be used by the web interface.
--- End diff --

what is a web directory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68907634
  
--- Diff: docs/setup/config.md ---
@@ -107,12 +109,20 @@ Please make sure to set the maximum ticket life span 
high long running jobs. The
 
 If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
 
+### Resource Manager
+
+- `resourcemanager.rpc.port`: The config parameter defining the network 
port to connect to for communication with the resource manager.
--- End diff --

Is it possible to specify a port range here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354827#comment-15354827
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68907947
  
--- Diff: docs/setup/config.md ---
@@ -230,8 +240,8 @@ definition. This scheme is used **ONLY** if no other 
scheme is specified (explic
 
 ## YARN
 
-- `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to 
remove from containers started by YARN. When a user requests a certain amount 
of memory for each TaskManager container (for example 4 GB), we can not pass 
this amount as the maximum heap space for the JVM (`-Xmx` argument) because the 
JVM is also allocating memory outside the heap. YARN is very strict with 
killing containers which are using more memory than requested. Therefore, we 
remove a 15% of the memory from the requested heap as a safety margin.
-- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut 
off the requested heap size.
+- `container.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space 
to remove from containers started by YARN. When a user requests a certain 
amount of memory for each TaskManager container (for example 4 GB), we can not 
pass this amount as the maximum heap space for the JVM (`-Xmx` argument) 
because the JVM is also allocating memory outside the heap. YARN is very strict 
with killing containers which are using more memory than requested. Therefore, 
we remove a 15% of the memory from the requested heap as a safety margin.
--- End diff --

Maybe insert *, for example,* after YARN because the parameter is not 
specific to YARN, is it?


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68907947
  
--- Diff: docs/setup/config.md ---
@@ -230,8 +240,8 @@ definition. This scheme is used **ONLY** if no other 
scheme is specified (explic
 
 ## YARN
 
-- `yarn.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space to 
remove from containers started by YARN. When a user requests a certain amount 
of memory for each TaskManager container (for example 4 GB), we can not pass 
this amount as the maximum heap space for the JVM (`-Xmx` argument) because the 
JVM is also allocating memory outside the heap. YARN is very strict with 
killing containers which are using more memory than requested. Therefore, we 
remove a 15% of the memory from the requested heap as a safety margin.
-- `yarn.heap-cutoff-min`: (Default 384 MB) Minimum amount of memory to cut 
off the requested heap size.
+- `container.heap-cutoff-ratio`: (Default 0.25) Percentage of heap space 
to remove from containers started by YARN. When a user requests a certain 
amount of memory for each TaskManager container (for example 4 GB), we can not 
pass this amount as the maximum heap space for the JVM (`-Xmx` argument) 
because the JVM is also allocating memory outside the heap. YARN is very strict 
with killing containers which are using more memory than requested. Therefore, 
we remove a 15% of the memory from the requested heap as a safety margin.
--- End diff --

Maybe insert *, for example,* after YARN because the parameter is not 
specific to YARN, is it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354821#comment-15354821
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68907634
  
--- Diff: docs/setup/config.md ---
@@ -107,12 +109,20 @@ Please make sure to set the maximum ticket life span 
high long running jobs. The
 
 If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
 
+### Resource Manager
+
+- `resourcemanager.rpc.port`: The config parameter defining the network 
port to connect to for communication with the resource manager.
--- End diff --

Is it possible to specify a port range here?


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354822#comment-15354822
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68907744
  
--- Diff: docs/setup/config.md ---
@@ -107,12 +109,20 @@ Please make sure to set the maximum ticket life span 
high long running jobs. The
 
 If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
 
+### Resource Manager
+
+- `resourcemanager.rpc.port`: The config parameter defining the network 
port to connect to for communication with the resource manager.
+
 ### Other
 
 - `taskmanager.tmp.dirs`: The directory for temporary files, or a list of 
directories separated by the systems directory delimiter (for example ':' 
(colon) on Linux/Unix). If multiple directories are specified, then the 
temporary files will be distributed across the directories in a round-robin 
fashion. The I/O manager component will spawn one reading and one writing 
thread per directory. A directory may be listed multiple times to have the I/O 
manager use multiple threads for it (for example if it is physically stored on 
a very fast disc or RAID) (DEFAULT: The system's tmp dir).
 
+- `taskmanager.log.path`: The config parameter defining the taskmanager 
log file location
+
 - `jobmanager.web.port`: Port of the JobManager's web interface (DEFAULT: 
8081).
 
+- `jobmanager.web.tmpdir`: This configuration parameter allows defining 
the Flink web directory to be used by the web interface.
--- End diff --

what is a web directory?


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-...

2016-06-29 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68908245
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

Yes, I used verify. The problem was that I was building from 
`flink/flink-streaming-connectors` instead of root to save some time, and the 
RAT plugin wasn't executed. I'll remember next time ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2177: [FLINK-4127] Check API compatbility for 1.1 in flink-core

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2177
  
Good changes @rmetzger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354828#comment-15354828
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2180#discussion_r68908245
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/SequenceNumber.java
 ---
@@ -0,0 +1,87 @@
+package org.apache.flink.streaming.connectors.kinesis.model;
--- End diff --

Yes, I used verify. The problem was that I was building from 
`flink/flink-streaming-connectors` instead of root to save some time, and the 
RAT plugin wasn't executed. I'll remember next time ;)


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354829#comment-15354829
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2177
  
Good changes @rmetzger.


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68908671
  
--- Diff: docs/setup/config.md ---
@@ -107,12 +109,20 @@ Please make sure to set the maximum ticket life span 
high long running jobs. The
 
 If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
 
+### Resource Manager
+
+- `resourcemanager.rpc.port`: The config parameter defining the network 
port to connect to for communication with the resource manager.
--- End diff --

No, it doesn't. It is currently not in use (except for testing code). In 
Yarn, the port of the application master is always the port of the job manager 
because the two run in the same actor system. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354833#comment-15354833
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68908671
  
--- Diff: docs/setup/config.md ---
@@ -107,12 +109,20 @@ Please make sure to set the maximum ticket life span 
high long running jobs. The
 
 If you are on YARN, then it is sufficient to authenticate the client with 
Kerberos. On a Flink standalone cluster you need to ensure that, initially, all 
nodes are authenticated with Kerberos using the `kinit` tool.
 
+### Resource Manager
+
+- `resourcemanager.rpc.port`: The config parameter defining the network 
port to connect to for communication with the resource manager.
--- End diff --

No, it doesn't. It is currently not in use (except for testing code). In 
Yarn, the port of the application master is always the port of the job manager 
because the two run in the same actor system. 


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68908810
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

Not really, I'm afraid. I just recently disabled it by default because of 
the known issues.

What I can gather from the code is that it is a separate sort buffer that 
is intended for very large records. It is not obvious from the code what "very 
large records" are, however.

The known problem is that key serialization does not work correctly if the 
user specified a custom type or if Scala types are used because the 
`TypeAnalyzer` is used in the `LargeRecordHandler` to get a `TypeInformation` 
on the fly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68908870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 ---
@@ -106,10 +108,12 @@
 * The latest completed checkpoint (highest ID) or null.
 */
private CompletedCheckpoint latestCompletedCheckpoint;
+   private CheckpointStats latestCompletedCheckpointStats;
--- End diff --

Where is this variable set?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354839#comment-15354839
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68908810
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

Not really, I'm afraid. I just recently disabled it by default because of 
the known issues.

What I can gather from the code is that it is a separate sort buffer that 
is intended for very large records. It is not obvious from the code what "very 
large records" are, however.

The known problem is that key serialization does not work correctly if the 
user specified a custom type or if Scala types are used because the 
`TypeAnalyzer` is used in the `LargeRecordHandler` to get a `TypeInformation` 
on the fly.


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354840#comment-15354840
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68908870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 ---
@@ -106,10 +108,12 @@
 * The latest completed checkpoint (highest ID) or null.
 */
private CompletedCheckpoint latestCompletedCheckpoint;
+   private CheckpointStats latestCompletedCheckpointStats;
--- End diff --

Where is this variable set?


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68909550
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

Where do we close the `jobMetrics` group?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354847#comment-15354847
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68909752
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

We only have to create a job metric group if we also create a 
`SimplCheckpointStatsTracker`.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354846#comment-15354846
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68909550
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

Where do we close the `jobMetrics` group?


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68909752
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

We only have to create a job metric group if we also create a 
`SimplCheckpointStatsTracker`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
I think it is good to integrate the metrics into the 
`CheckpointStatsTracker`.

Even if such a test seems to be icky, I think it would be a valuable 
addition. We already use this kind of pattern for a multitude of other tests 
and it has proven to work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354849#comment-15354849
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
I think it is good to integrate the metrics into the 
`CheckpointStatsTracker`.

Even if such a test seems to be icky, I think it would be a valuable 
addition. We already use this kind of pattern for a multitude of other tests 
and it has proven to work.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2149: [FLINK-4084] Add configDir parameter to CliFronten...

2016-06-29 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r68910391
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -132,9 +132,9 @@
 
 
 
-   private final Configuration config;
+   private Configuration config;
--- End diff --

Could we make the `configDir` parameter a general parameter that goes 
before `run`, `list`, `cancel`, etc.? Don't know if that is too tricky but it 
would make sense because it is a general configuration key.

`flink --configDir /my/custom/dir run -p 10 /path/to/jar`

That way the config parameter wouldn't have to be listed for every action. 
Would make the help look less verbose.

You could parse the config parameter in the constructor and initialize the 
config only once. Then continue with the normal parsing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4084) Add configDir parameter to CliFrontend and flink shell script

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354850#comment-15354850
 ] 

ASF GitHub Bot commented on FLINK-4084:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2149#discussion_r68910391
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -132,9 +132,9 @@
 
 
 
-   private final Configuration config;
+   private Configuration config;
--- End diff --

Could we make the `configDir` parameter a general parameter that goes 
before `run`, `list`, `cancel`, etc.? Don't know if that is too tricky but it 
would make sense because it is a general configuration key.

`flink --configDir /my/custom/dir run -p 10 /path/to/jar`

That way the config parameter wouldn't have to be listed for every action. 
Would make the help look less verbose.

You could parse the config parameter in the constructor and initialize the 
config only once. Then continue with the normal parsing.


> Add configDir parameter to CliFrontend and flink shell script
> -
>
> Key: FLINK-4084
> URL: https://issues.apache.org/jira/browse/FLINK-4084
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
>Assignee: Andrea Sella
>Priority: Minor
>
> At the moment there is no other way than the environment variable 
> FLINK_CONF_DIR to specify the configuration directory for the CliFrontend if 
> it is started via the flink shell script. In order to improve the user 
> exprience, I would propose to introduce a {{--configDir}} parameter which the 
> user can use to specify a configuration directory more easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68910582
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows users gather and expose metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling getRuntimeContext().getMetricGroup().
+This method returns a MetricGroup object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a Counter like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports Counters, Gauges and Histograms.
+
+A Counter is used to count something. The current value can be in- or 
decremented using `inc([long n])` or `dec([long n])`.
+You can create and registers a Counter by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own Counter 
implementation along with the name.
+
+A Gauge provides a value of any type on demand. In order to use a Gauge 
you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represent the entities it is tied to. By default a metric that is registered in 
a user function will be scoped to the Operator in which the function runs, the 
Task/Job it belongs to and the TaskManager/Host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the 
MetricGroup#addGroup((int/String) name) method.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter() call. The order is always 
\\\.
--- End diff --

I don't understand how backticks solve the issue i mentioned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354854#comment-15354854
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68910582
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows users gather and expose metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling getRuntimeContext().getMetricGroup().
+This method returns a MetricGroup object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a Counter like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports Counters, Gauges and Histograms.
+
+A Counter is used to count something. The current value can be in- or 
decremented using `inc([long n])` or `dec([long n])`.
+You can create and registers a Counter by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own Counter 
implementation along with the name.
+
+A Gauge provides a value of any type on demand. In order to use a Gauge 
you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represent the entities it is tied to. By default a metric that is registered in 
a user function will be scoped to the Operator in which the function runs, the 
Task/Job it belongs to and the TaskManager/Host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the 
MetricGroup#addGroup((int/String) name) method.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter() call. The order is always 
\\\.
--- End diff --

I don't understand how backticks solve the issue i mentioned.


> Document metrics
> 
>
> Key: FLINK-4116
> URL: https://issues.apache.org/jira/browse/FLINK-4116
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> The metric system is currently not documented, which should be fixed before 
> the 1.1 release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68913433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 ---
@@ -106,10 +108,12 @@
 * The latest completed checkpoint (highest ID) or null.
 */
private CompletedCheckpoint latestCompletedCheckpoint;
+   private CheckpointStats latestCompletedCheckpointStats;
--- End diff --

this variable is unused.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354870#comment-15354870
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68913433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 ---
@@ -106,10 +108,12 @@
 * The latest completed checkpoint (highest ID) or null.
 */
private CompletedCheckpoint latestCompletedCheckpoint;
+   private CheckpointStats latestCompletedCheckpointStats;
--- End diff --

this variable is unused.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68914059
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

JobManager.scala, line 1664.

I would also create it regardless of the Tracker. The moment we use it in 
another component we would have to move it out of that condition anyways.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354874#comment-15354874
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68914059
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

JobManager.scala, line 1664.

I would also create it regardless of the Tracker. The moment we use it in 
another component we would have to move it out of that condition anyways.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354875#comment-15354875
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
Can you point me to a test that does this?


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2146
  
Can you point me to a test that does this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-once fo...

2016-06-29 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2180
  
+1 the change is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354906#comment-15354906
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2180
  
+1 the change is good to merge.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4127) Clean up configuration and check breaking API changes

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354908#comment-15354908
 ] 

ASF GitHub Bot commented on FLINK-4127:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68918991
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

Maybe we should not mention this in the documentation then?


> Clean up configuration and check breaking API changes
> -
>
> Key: FLINK-4127
> URL: https://issues.apache.org/jira/browse/FLINK-4127
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.1.0
>
> Attachments: flink-core.html, flink-java.html, flink-scala.html, 
> flink-streaming-java.html, flink-streaming-scala.html
>
>
> For the upcoming 1.1. release, I'll check if there are any breaking API 
> changes and if the documentation is up tp date with the configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2177: [FLINK-4127] Check API compatbility for 1.1 in fli...

2016-06-29 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2177#discussion_r68918991
  
--- Diff: docs/setup/config.md ---
@@ -85,6 +85,8 @@ The default fraction for managed memory can be adjusted 
using the `taskmanager.m
 
 - `taskmanager.memory.preallocate`: Can be either of `true` or `false`. 
Specifies whether task managers should allocate all managed memory when 
starting up. (DEFAULT: false)
 
+- `taskmanager.runtime.large-record-handler`: Whether to use the 
LargeRecordHandler when spilling. This feature is experimental. (DEFAULT: false)
--- End diff --

Maybe we should not mention this in the documentation then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3294) KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets()

2016-06-29 Thread Jens Kat (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15354983#comment-15354983
 ] 

Jens Kat commented on FLINK-3294:
-

I am interested in using 'option 2' committing into a consumer_offset topic of 
kafka, instead of using Zookeeper as offset-store.
What is the status of your code? Is it far from complete or does it need some 
tidying up, I can try to help with that.  

> KafkaConsumer (0.8) commit offsets using SimpleConsumer.commitOffsets()
> ---
>
> Key: FLINK-3294
> URL: https://issues.apache.org/jira/browse/FLINK-3294
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Robert Metzger
>Assignee: Robert Metzger
>
> Currently, the 0.8 consumer for Kafka is committing the offsets manually into 
> Zookeeper so that users can track the lag using external tools.
> The 0.8 consumer has a pluggable design, and this component is easily 
> pluggable.
> Since OffsetCommitRequest version=1 (supported in 0.8.2 or later), users can 
> choose between two offset commit modes:
> a) Let the broker commit into ZK (this is  what we are doing from the consumer
> b) Let the broker commit the offset into a special topic.
> By adding a different "OffsetHandler" backend, users can commit offsets from 
> the brokers (reducing the total number of ZK connections) or into the 
> broker's offset topic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2181: [FLINK-4033][docs, kinesis-connector] Polish up Ki...

2016-06-29 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2181

[FLINK-4033][docs, kinesis-connector] Polish up Kinesis connector 
documentation

Main purpose of this PR is to add Scala example snippets for the Kinesis 
consumer & producer.
I've came across some other missing info & typos in the documentation and 
fixed them as well as part of the PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4033

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2181


commit 7d3caf424eba675e14fa2f219893156be5d390eb
Author: Gordon Tai 
Date:   2016-06-29T10:29:08Z

[FLINK-4033] Polish up Kinesis connector documentation

Includes:
1. Scala examples for consumer and producer
2. Add information about AWS Kinesis service usage
3. Add Kinesis connecter to the fault tolerance guarantees table
4. Minor typo fix in Kafka documentation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4033) Missing Scala example snippets for the Kinesis Connector documentation

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355028#comment-15355028
 ] 

ASF GitHub Bot commented on FLINK-4033:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2181

[FLINK-4033][docs, kinesis-connector] Polish up Kinesis connector 
documentation

Main purpose of this PR is to add Scala example snippets for the Kinesis 
consumer & producer.
I've came across some other missing info & typos in the documentation and 
fixed them as well as part of the PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4033

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2181


commit 7d3caf424eba675e14fa2f219893156be5d390eb
Author: Gordon Tai 
Date:   2016-06-29T10:29:08Z

[FLINK-4033] Polish up Kinesis connector documentation

Includes:
1. Scala examples for consumer and producer
2. Add information about AWS Kinesis service usage
3. Add Kinesis connecter to the fault tolerance guarantees table
4. Minor typo fix in Kafka documentation




> Missing Scala example snippets for the Kinesis Connector documentation
> --
>
> Key: FLINK-4033
> URL: https://issues.apache.org/jira/browse/FLINK-4033
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.1.0
>
>
> The documentation for the Kinesis connector is missing Scala version of the 
> example snippets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2131: [FLINK-3231][streaming-connectors] FlinkKinesisConsumer r...

2016-06-29 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
I'll rebase this after #2180 is merged to reflect in this PR the 
exactly-once fix, and commit altogether with the changes to address the 
remaining comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355039#comment-15355039
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2131
  
I'll rebase this after #2180 is merged to reflect in this PR the 
exactly-once fix, and commit altogether with the changes to address the 
remaining comments.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2131: [FLINK-3231][streaming-connectors] FlinkKinesisCon...

2016-06-29 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68930725
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355053#comment-15355053
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2131#discussion_r68930725
  
--- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardDiscoverer.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.internals;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard;
+import 
org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState;
+import 
org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This runnable is in charge of discovering new shards that a fetcher 
should subscribe to.
+ * It is submitted to {@link 
KinesisDataFetcher#shardDiscovererAndSubscriberExecutor} and continuously runs 
until the
+ * fetcher is closed. Whenever it discovers a new shard that should be 
subscribed to, the shard is added to the
+ * {@link KinesisDataFetcher#pendingShards} queue with initial state, i.e. 
where in the new shard we should start
+ * consuming from.
+ */
+public class ShardDiscoverer implements Runnable {
--- End diff --

Yes.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store

[GitHub] flink issue #2131: [FLINK-3231][streaming-connectors] FlinkKinesisConsumer r...

2016-06-29 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
Okay, I'll merge the exactly once fix now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3231) Handle Kinesis-side resharding in Kinesis streaming consumer

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355054#comment-15355054
 ] 

ASF GitHub Bot commented on FLINK-3231:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2131
  
Okay, I'll merge the exactly once fix now.


> Handle Kinesis-side resharding in Kinesis streaming consumer
> 
>
> Key: FLINK-3231
> URL: https://issues.apache.org/jira/browse/FLINK-3231
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.1.0
>
>
> A big difference between Kinesis shards and Kafka partitions is that Kinesis 
> users can choose to "merge" and "split" shards at any time for adjustable 
> stream throughput capacity. This article explains this quite clearly: 
> https://brandur.org/kinesis-by-example.
> This will break the static shard-to-task mapping implemented in the basic 
> version of the Kinesis consumer 
> (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task 
> mapping is done in a simple round-robin-like distribution which can be 
> locally determined at each Flink consumer task (Flink Kafka consumer does 
> this too).
> To handle Kinesis resharding, we will need some way to let the Flink consumer 
> tasks coordinate which shards they are currently handling, and allow the 
> tasks to ask the coordinator for a shards reassignment when the task finds 
> out it has found a closed shard at runtime (shards will be closed by Kinesis 
> when it is merged and split).
> We need a centralized coordinator state store which is visible to all Flink 
> consumer tasks. Tasks can use this state store to locally determine what 
> shards it can be reassigned. Amazon KCL uses a DynamoDB table for the 
> coordination, but as described in 
> https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use 
> KCL for the implementation of the consumer if we want to leverage Flink's 
> checkpointing mechanics. For our own implementation, Zookeeper can be used 
> for this state store, but that means it would require the user to set up ZK 
> to work.
> Since this feature introduces extensive work, it is opened as a separate 
> sub-task from the basic implementation 
> https://issues.apache.org/jira/browse/FLINK-3229.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355056#comment-15355056
 ] 

ASF GitHub Bot commented on FLINK-4080:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2180


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2180: [FLINK-4080][kinesis-connector] Guarantee exactly-...

2016-06-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2180


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4080) Kinesis consumer not exactly-once if stopped in the middle of processing aggregated records

2016-06-29 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4080.
---
Resolution: Fixed

Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/fa42cdab

> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> ---
>
> Key: FLINK-4080
> URL: https://issues.apache.org/jira/browse/FLINK-4080
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix:
> 1. Use the extended `UserRecord` class in KCL to represent all records 
> (either non- or de-aggregated) instead of the basic `Record` class. This 
> gives access to whether or not the record was originally aggregated.
> 2. The sequence number state we are checkpointing needs to be able to 
> indicate that the last seen sequence number of a shard may be a de-aggregated 
> shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record 
> of the 5th record was last seen for shard 0. On restore, we start again from 
> record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 
> we start from record 3 since record 2 is non-aggregated and already fully 
> processed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2158: [FLINK-4116] Metrics documentation

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68931051
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows users gather and expose metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling getRuntimeContext().getMetricGroup().
+This method returns a MetricGroup object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a Counter like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports Counters, Gauges and Histograms.
+
+A Counter is used to count something. The current value can be in- or 
decremented using `inc([long n])` or `dec([long n])`.
+You can create and registers a Counter by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own Counter 
implementation along with the name.
+
+A Gauge provides a value of any type on demand. In order to use a Gauge 
you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represent the entities it is tied to. By default a metric that is registered in 
a user function will be scoped to the Operator in which the function runs, the 
Task/Job it belongs to and the TaskManager/Host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the 
MetricGroup#addGroup((int/String) name) method.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter() call. The order is always 
\\\.
--- End diff --

Hmm I think you're right, since the periods won't probably be rendered 
differently. But you could put the whole name in quotation marks. Then it 
should be clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4116) Document metrics

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355058#comment-15355058
 ] 

ASF GitHub Bot commented on FLINK-4116:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2158#discussion_r68931051
  
--- Diff: docs/apis/common/index.md ---
@@ -1350,3 +1350,211 @@ You may specify program arguments before the job is 
executed. The plan visualiza
 the execution plan before executing the Flink job.
 
 {% top %}
+
+Metrics
+---
+
+Flink exposes a metric system that allows users gather and expose metrics 
to external systems.
+
+### Registering metrics
+
+You can access the metric system from any user function that extends 
[RichFunction]({{ site.baseurl }}/apis/common/index.html#rich-functions) by 
calling getRuntimeContext().getMetricGroup().
+This method returns a MetricGroup object on which you can create and 
register new metrics.
+
+If you want to count the number of records your user function has received 
you could use a Counter like this:
+
+{% highlight java %}
+
+public class MyMapper extends RichMapFunction {
+  private Counter counter;
+
+  @Override
+  public void open(Configuration config) {
+// create and register a counter
+this.counter = 
getRuntimeContext().getMetricGroup().counter("myCounter");
+...
+  }
+
+  @public Integer map(String value) throws Exception {
+// increment counter
+this.counter.inc();
+...
+  }
+}
+
+{% endhighlight %}
+
+### Metric types
+
+Flink supports Counters, Gauges and Histograms.
+
+A Counter is used to count something. The current value can be in- or 
decremented using `inc([long n])` or `dec([long n])`.
+You can create and registers a Counter by calling `counter(String name)` 
on a MetricGroup. Alternatively, you can pass an instance of your own Counter 
implementation along with the name.
+
+A Gauge provides a value of any type on demand. In order to use a Gauge 
you must first create a class that implements the 
`org.apache.flink.metrics.Gauge` interface.
+There is not restriction for the type of the returned value.
+You can register a gauge by calling `gauge(String name, Gauge gauge)` on a 
MetricGroup.
+
+A Histogram measure the distribution of long values.
+You can register one by calling histogram(String name, Histogram 
histogram) on a MetricGroup.
+
+Flink only provides an interface for Histograms, but offers a Wrapper that 
allows usage of Codahale/DropWizard Histograms. 
(org.apache.flink.dropwizard.metrics.DropWizardHistogramWrapper)
+
+### Scope
+
+Every registered metric has an automatically assigned scope which 
represent the entities it is tied to. By default a metric that is registered in 
a user function will be scoped to the Operator in which the function runs, the 
Task/Job it belongs to and the TaskManager/Host it is executed on. This is 
referred to as the "system scope".
+
+You can define an additonal "user scope" by calling the 
MetricGroup#addGroup((int/String) name) method.
+
+{% highlight java %}
+
+counter2 = 
getRuntimeContext().getMetricGroup().addGroup("MyMetrics").counter("myCounter2");
+
+{% endhighlight %}
+
+The name under which a metric is exported is based on both scopes and the 
name passed in the `counter() call. The order is always 
\\\.
--- End diff --

Hmm I think you're right, since the periods won't probably be rendered 
differently. But you could put the whole name in quotation marks. Then it 
should be clear.


> Document metrics
> 
>
> Key: FLINK-4116
> URL: https://issues.apache.org/jira/browse/FLINK-4116
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Metrics
>Affects Versions: 1.1.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.1.0
>
>
> The metric system is currently not documented, which should be fixed before 
> the 1.1 release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2181: [FLINK-4033][docs, kinesis-connector] Polish up Kinesis c...

2016-06-29 Thread rmetzger
Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2181
  
Thank you for opening a PR. I'll merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68931182
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

But then we should do it when it becomes necessary and not now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4033) Missing Scala example snippets for the Kinesis Connector documentation

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355060#comment-15355060
 ] 

ASF GitHub Bot commented on FLINK-4033:
---

Github user rmetzger commented on the issue:

https://github.com/apache/flink/pull/2181
  
Thank you for opening a PR. I'll merge it.


> Missing Scala example snippets for the Kinesis Connector documentation
> --
>
> Key: FLINK-4033
> URL: https://issues.apache.org/jira/browse/FLINK-4033
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.1.0
>
>
> The documentation for the Kinesis connector is missing Scala version of the 
> example snippets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355061#comment-15355061
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2146#discussion_r68931182
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -1235,7 +1234,7 @@ class JobManager(
 ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
 
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE)
 
-  new SimpleCheckpointStatsTracker(historySize, ackVertices)
+  new SimpleCheckpointStatsTracker(historySize, ackVertices, 
jobMetrics)
--- End diff --

But then we should do it when it becomes necessary and not now.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1550) Show JVM Metrics for JobManager

2016-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15355063#comment-15355063
 ] 

ASF GitHub Bot commented on FLINK-1550:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
The `JobManagerHARecoveryTest.testJobRecoveryWhenLosingLeadership` uses a 
`BlockingInvokable`, for example.


> Show JVM Metrics for JobManager
> ---
>
> Key: FLINK-1550
> URL: https://issues.apache.org/jira/browse/FLINK-1550
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
> Fix For: pre-apache
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2146: [FLINK-1550/FLINK-4057] Add JobManager Metrics

2016-06-29 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2146
  
The `JobManagerHARecoveryTest.testJobRecoveryWhenLosingLeadership` uses a 
`BlockingInvokable`, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4033) Missing Scala example snippets for the Kinesis Connector documentation

2016-06-29 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4033.
---
Resolution: Fixed

Merged in http://git-wip-us.apache.org/repos/asf/flink/commit/256c9c4d

Thank you for all your work!

> Missing Scala example snippets for the Kinesis Connector documentation
> --
>
> Key: FLINK-4033
> URL: https://issues.apache.org/jira/browse/FLINK-4033
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Kinesis Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Minor
> Fix For: 1.1.0
>
>
> The documentation for the Kinesis connector is missing Scala version of the 
> example snippets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2181: [FLINK-4033][docs, kinesis-connector] Polish up Ki...

2016-06-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2181


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >