[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15839336#comment-15839336
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/605
  

[![Coverage 
Status](https://coveralls.io/builds/9850383/badge)](https://coveralls.io/builds/9850383)

Changes Unknown when pulling **342f5c99cc530ccf2a6281223d1f4f917f1fb497 on 
tammymendt:FLINK-1297-v2** into ** on apache:master**.



> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Alexander Alexandrov
>Assignee: Tamara
> Fix For: 0.10.0
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736513#comment-14736513
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138847826
  
After rebasing the PR to the current master, the 
`OperatorStatsAccumulatorTest` is failing. In the original PR (based on a 
master from begin of August) the test is passing. @mxm, you are more familiar 
with Flink's accumulators. Can you have a look? Thanks!


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736697#comment-14736697
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138883419
  
The test I wrote checks if the final accumulator result is the one that is 
expected. If I understand correctly, now accumulator values are sent 
periodically before a task finishes. Do I need to write an additional test to 
make sure that accumulator results before task end are also as expected? If not 
I would say it is ok to merge. Have you written extra tests for other 
accumulators like IntCounter?


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736598#comment-14736598
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138863134
  
Hi @tammymendt. Thanks for the pull request! The accumulators work a little 
bit different now because they are now accumulated on a per-task basis and 
reported to the job manager in regular intervals.

The `clone()` method in `OperatorStatistics` doesn't create a deep copy of 
the object, i.e. some references are reused. That causes problems when merging 
the accumulators because runtime accumulators are modified while merging 
accumulators for sending them to the job manager.

I could make the test pass by a nasty deep copy using Java serialization. 
However, I didn't managed to make a proper copy using the provided interfaces. 
I think you can probably do that faster because you know the code very well.


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736767#comment-14736767
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138895569
  
Could you update the pull request with the new `clone()` method? I will 
then merge the pull request if it passes Travis.


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736534#comment-14736534
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138852658
  
I can have a look at it this afternoon. I suspect it might be because the
function getAllAccumulators has been deprecated.
On Sep 9, 2015 11:11 AM, "Fabian Hueske"  wrote:

> After rebasing the PR to the current master, the
> OperatorStatsAccumulatorTest is failing. In the original PR (based on a
> master from begin of August) the test is passing. @mxm
> , you are more familiar with Flink's
> accumulators. Can you have a look? Thanks!
>
> —
> Reply to this email directly or view it on GitHub
> .
>



> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736762#comment-14736762
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138895326
  
Yes, there are tests in place of the new accumulators. The included 
Accumulators all implement a deep copy in the clone() method. That's why they 
didn't have to be ported. You don't need to include an extra test for the 
runtime accumulators.


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736617#comment-14736617
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138866235
  
I tried again, this works:

```java
@Override
public OperatorStatistics clone(){
OperatorStatistics clone = new OperatorStatistics(config);
clone.min = min;
clone.max = max;
clone.cardinality = cardinality;

try {
ICardinality copy;
if (countDistinct instanceof LinearCounting) {
copy = new 
LinearCounting(config.getCountDbitmap());
} else if (countDistinct instanceof HyperLogLog) {
copy = new HyperLogLog(config.getCountDlog2m());
} else {
throw new IllegalStateException("Unsupported 
counter.");
}
clone.countDistinct = copy.merge(countDistinct);
} catch (CardinalityMergeException e) {
throw new RuntimeException("Faild to clone 
OperatorStatistics!");
}

try {
HeavyHitter copy;
if (heavyHitter instanceof LossyCounting) {
copy = new 
LossyCounting(config.getHeavyHitterFraction(), config.getHeavyHitterError());
} else if (heavyHitter instanceof CountMinHeavyHitter) {
copy = new 
CountMinHeavyHitter(config.getHeavyHitterFraction(),
config.getHeavyHitterError(),

config.getHeavyHitterConfidence(),
config.getHeavyHitterSeed());
} else {
throw new IllegalStateException("Unsupported 
counter.");
}
copy.merge(heavyHitter);
clone.heavyHitter = copy;
} catch (HeavyHitterMergeException e) {
throw new RuntimeException("Failed to clone 
OperatorStatistics!");
}

return clone;
}
```

Do you think we could merge your pull request with this change?


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736823#comment-14736823
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138906837
  
I rebased with master and included the new clone() method. I had to include 
extra conditionals that check whether count distinct or heavy hitters are being 
tracked because otherwise the clone method would throw an exception in some 
cases in which it shouldn't. Also, I added some tests to the 
OperatorStatsAccumulatorTest class because I realized that I had not written 
tests for different configurations of the OperatorStatsConfig class.


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736999#comment-14736999
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138939760
  
Thanks @tammymendt for the PR and the last fixes. Sorry that it took us so 
long to merge it :-/


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Tamara
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14737013#comment-14737013
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/605


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Tamara
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14736985#comment-14736985
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138937490
  
Looks good. Merging this now.


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14735661#comment-14735661
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138709021
  
Hi @tammymendt, it's been a while again. Sorry!
I'll try to rebase and merge this PR tomorrow.


> Add support for tracking statistics of intermediate results
> ---
>
> Key: FLINK-1297
> URL: https://issues.apache.org/jira/browse/FLINK-1297
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Runtime
>Reporter: Alexander Alexandrov
>Assignee: Alexander Alexandrov
> Fix For: 0.10
>
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> One of the major problems related to the optimizer at the moment is the lack 
> of proper statistics.
> With the introduction of staged execution, it is possible to instrument the 
> runtime code with a statistics facility that collects the required 
> information for optimizing the next execution stage.
> I would therefore like to contribute code that can be used to gather basic 
> statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
> count distinct) and make them available to the job manager.
> Before I start, I would like to hear some feedback form the other users.
> In particular, to handle skew (e.g. on grouping) it might be good to have 
> some sort of detailed sketch about the key distribution of an intermediate 
> result. I am not sure whether a simple histogram is the most effective way to 
> go. Maybe somebody would propose another lightweight sketch that provides 
> better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699581#comment-14699581
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r37192039
  
--- Diff: flink-contrib/flink-operator-stats/pom.xml ---
@@ -0,0 +1,67 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;
+
+modelVersion4.0.0/modelVersion
+
+parent
+groupIdorg.apache.flink/groupId
+artifactIdflink-contrib-parent/artifactId
+version0.10-SNAPSHOT/version
+relativePath../relativePath
+/parent
+
+artifactIdflink-operator-stats/artifactId
+nameflink-operator-stats/name
+
+packagingjar/packaging
+
+dependencies
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-java/artifactId
+version${project.version}/version
+/dependency
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-core/artifactId
+version${project.version}/version
+/dependency
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-test-utils/artifactId
+version${project.version}/version
+scopetest/scope
+/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
--- End diff --

ASL 2.0 license, that's good.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14699585#comment-14699585
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-131840098
  
+1 for merging.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14655229#comment-14655229
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127965139
  
Yes, if there is commitment to to maintain the code and not just dump it, 
it is good to merge.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14655036#comment-14655036
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127923972
  
I will maintain the code. Though I am not committing other code to Flink, I 
am responsible for what I coded and will not bail as soon as it is merged. I 
think this feature might be useful for other people, but if you prefer not to 
merge it I can close the PR and just leave it on my branch.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14655088#comment-14655088
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127937557
  
I think there is little risk in merging it to `flink-contrib`. We made that 
place exactly for this contributions like this one. If we find that the code is 
popular and used by many people, we can also move it.

In general, I am +1 for adding this feature.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14653382#comment-14653382
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127552585
  
It is a nice addition, I agree.

The question for all code that we add is who can maintain it. Code never 
gets just added and then simply works and can stay. Any code that is added 
needs to be maintained by someone, be that a committer, or by a commitment from 
the contributor.

If we can find a maintainer, than I am find with adding this.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14653421#comment-14653421
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127558961
  
@StephanEwen To be honest I'm kind of puzzled and somewhat annoyed that a 
PR that

1) adds a feature that has been on the list for at least 2 years,
2) has been tested and evaluated thoroughly, and
3) has been lying around since April and was rebased at least three times

is still open. 

I was also not aware that there is a strict maintainer policy for 
`flink-contrib` commits. IMHO this is a good place to expose non-critical / 
exploratory / unstable features that might be of benefit for multiple users to 
the project. If these turn out the be useful for enough people, I'm sure there 
will be incentive to maintain them. Otherwise there is always the option to 
deprecate and cleanup recent additions to the contrib prior to the next release 
if nobody picked up on them. 


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14653257#comment-14653257
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127519735
  
Hey! So I've been using and testing this code throughout my master thesis. 
Collecting count distinct makes jobs about 10% slower whereas collecting heavy 
hitters can make a job be 20 to 50% slower (depending on the algorithm and the 
distribution of the data). However this overhead is lower than that of using a 
histogram accumulator (not to mention the histogram might not fit in memory). I 
think it can be a nice addition to the code, specially since it does not affect 
any core components. 

The version that I pushed now uses a bunch of conditionals to check which 
statistic is being collected. I know @fhueske did not really like this. I 
implemented another version which avoids the conditionals by using a different 
class for every type of statistic. I preferred to push this version though, 
since it has been more thoroughly tested.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14580406#comment-14580406
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-110702228
  
Everything seems to be OK now. Is there still a chance to merge this before 
the upcoming release?

I have pushed two commits. The first is the original version with the bug 
fixes. The second is a re-factored version that makes the code more flexible 
and extensible. The second version has a couple more classes than the first 
though. I have pushed both in separate commits so you can decide which version 
you think is preferable. 

Cheers,

Tamara


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14580425#comment-14580425
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-110715816
  
Hi @tammymendt 

thanks for the update! I'm afraid, we are already in the testing phase for 
the release and don't want to add new features. Since we postponed a few 
important features to the next release, we will push hard to get the next 
release out rather soon.

I'll have a look at the PR in detail after the release stress is over.

Fabian


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-06-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14580433#comment-14580433
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-110720993
  
Sounds good! Thanks =)


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-06-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14577160#comment-14577160
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109990676
  
Hey! 
So I solved the NullPointer and as far as I can see it is not bothering any 
more. Given the non-deterministic nature of the algorithms, some tests fail 
sometimes. I am looking into this. I am considering  leaving out tests for the 
accuracy of the algorithms (in the Accumulator test), given that they are 
already programmed in other test classes. This should avoid these algorithm 
accuracy related errors in the OperatorStatisticsAccumulatorTest class.

Also, I have been refactoring this code to avoid using the conditionals 
that check which statistics are being collected in the process and merge 
methods. This is based on a remark that @fhueske made way earlier in the PR. 
Should I try and make a PR for the refactored code, or rather leave this in its 
current state, and wait until it is merged until I make a PR for the refactored 
version?

Cheers!


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-06-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14575617#comment-14575617
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109537258
  
Ok, I will check this out today.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-06-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14575263#comment-14575263
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109446516
  
I am merging this for the next version.
Very nice addition, sorry for the delay.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-06-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14575462#comment-14575462
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109490490
  
The tests seem to be non-deterministic and fail frequently.

Check out this build: 
https://travis-ci.org/StephanEwen/incubator-flink/jobs/65634990

The tests need to be more stable before we can add this to the codebase.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14504939#comment-14504939
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28776656
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
--- End diff --

The problem with processing only every n-th element is that precision would 
be affected, and the algorithms are already estimating as it is. I am planning 
to measure performance overhead but haven't done it so far. 

I have 

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505108#comment-14505108
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28789034
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505631#comment-14505631
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28816129
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505292#comment-14505292
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user aalexandrov commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28799879
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14505692#comment-14505692
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user aalexandrov commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28818529
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503961#comment-14503961
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28739333
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/heavyhitters/IHeavyHitter.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics.heavyhitters;
+
+import java.util.HashMap;
+
+/**
+ * Interface for classes that track heavy hitters. It follows the same 
design as the interfaces from
+ * {@link com.clearspring.analytics.stream}
+ */
+public interface IHeavyHitter {
--- End diff --

Interfaces do not follow a special naming convention in the Flink code base 
such as starting with a capital `I`. Please rename to `HeavyHitter` for 
consistency.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14503979#comment-14503979
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28739811
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
--- End diff --

It looks like this method expected to be called for each passing element if 
statistics collection is enabled. This could add significant processing 
overhead. Would it make sense to add an optional skip interval n that processes 
only 

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502547#comment-14502547
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-94401122
  
Hey, thanks for the quick feedback. I agree with separating the 
contribution as a library, however I have a question. The reason I have used a 
specialized method in the RuntimeEnvironment, is that for the 
OperatorStatsAccumulator, we do not want to only track the accumulated 
statistics per job, but also keep an array of the statistics collected at each 
task. The subtaskIndex which part of the RuntimeEnvironment is passed as a 
parameter to the constructor of the accumulator so that we can track which 
subtask is associated to which stats. Any ideas as to how I could use the same 
accumulator infrastructure but be able to associate an accumulated value with a 
given subtask?


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14502900#comment-14502900
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-94464757
  
Keeping the accumulators from each task separately is something that we 
want to change anyways in the main system. Then, this would not need special 
casing.

Until then, you should be able to encode the subtask into the accumulator 
name, like taskname_i. Then, the accumulators do not get combined.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14501182#comment-14501182
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28642379
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.OperatorStatsAccumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statistics.OperatorStatistics;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final String ACCUMULATOR_NAME = op-stats-accumulator;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() {
+
+   try {
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   
input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   
input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, 
input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+   
+   JobExecutionResult result = env.execute();
+   System.out.println(Accumulator results:);
+
+   OperatorStatistics globalStats = 
result.getOperatorStatisticsResult(ACCUMULATOR_NAME);
+   System.out.println(Global Stats);
+   System.out.println(globalStats.toString());
+
+   OperatorStatistics[] localStats = 
result.getLocalOperatorStatisticsResults(ACCUMULATOR_NAME);
+   System.out.println(Local stats: 0);
+   System.out.println(localStats[0].toString());
+
+   OperatorStatistics merged = localStats[0].clone();
+   for (int i=1;ilocalStats.length;i++) {
+   merged.merge(localStats[i]);
+   System.out.println(Local stats: +i);
+   System.out.println(localStats[i].toString());
+   }
+   
Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   
Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }
+   catch (Exception e) {
--- End diff --

Tests fail anyways if an Exception is 

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499431#comment-14499431
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-93938006
  
I made some comments to the PR.
You can update it by pushing into the same branch.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499424#comment-14499424
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575850
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/statistics/heavyhitters/LossyCountingTest.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics.heavyhitters;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+/*
+* Test the structure implemented for Lossy Counting
+*/
+
+public class LossyCountingTest {
+
+   static final double fraction = 0.05;
+   static final double error = 0.005;
+   static final int seed = 7362181;
+   static final Random r = new Random(seed);
+
+   @Test
+   public void testAccuracy() {
+
+   int numItems = 100;
+   long frequency = (int)Math.ceil(numItems* fraction);
+   long minFrequency = (int)Math.ceil(numItems* (fraction-error));
+
+   int[] xs = new int[numItems];
+   int maxScale = 20;
+
+   for (int i = 0; i  numItems; i++) {
+   double p = r.nextDouble();
+   if (p0.2){
+   xs[i] = r.nextInt(5);
+   }else {
+   int scale = r.nextInt(maxScale);
+   xs[i] = r.nextInt(1  scale);
+   }
+   }
+
+   LossyCounting lossyCounting = new LossyCounting(fraction,error);
+
+   for (int x : xs) {
+   lossyCounting.addObject(x);
+   }
+
+   long[] actualFreq = new long[1  maxScale];
+   for (int x : xs) {
+   actualFreq[x]++;
+   }
+
+   System.out.println(Size of heavy hitters: 
+lossyCounting.heavyHitters.size());
--- End diff --

can you use regular logging instead of system.outs for these messages?
We have different logging configurations for travis, local and IDE but we 
can not control system.outs with our logging settings.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499419#comment-14499419
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575727
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.OperatorStatsAccumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statistics.OperatorStatistics;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final String ACCUMULATOR_NAME = op-stats-accumulator;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() {
+
+   try {
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   
input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   
input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, 
input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+   
+   JobExecutionResult result = env.execute();
+   System.out.println(Accumulator results:);
+
+   OperatorStatistics globalStats = 
result.getOperatorStatisticsResult(ACCUMULATOR_NAME);
+   System.out.println(Global Stats);
+   System.out.println(globalStats.toString());
+
+   OperatorStatistics[] localStats = 
result.getLocalOperatorStatisticsResults(ACCUMULATOR_NAME);
+   System.out.println(Local stats: 0);
+   System.out.println(localStats[0].toString());
+
+   OperatorStatistics merged = localStats[0].clone();
+   for (int i=1;ilocalStats.length;i++) {
+   merged.merge(localStats[i]);
+   System.out.println(Local stats: +i);
+   System.out.println(localStats[i].toString());
+   }
+   
Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   
Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }
+   catch (Exception e) {
--- End diff --

I would fail the test in case of an 

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499429#comment-14499429
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575968
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
+   if (tupleObject instanceof Comparable) {
+   if (config.collectMin  (min == null || ((Comparable) 
tupleObject).compareTo(min)  0)) {
+   min = tupleObject;
+   }
+  

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499428#comment-14499428
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575964
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
+   if (tupleObject instanceof Comparable) {
+   if (config.collectMin  (min == null || ((Comparable) 
tupleObject).compareTo(min)  0)) {
+   min = tupleObject;
+   }
+  

[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499790#comment-14499790
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28591824
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
+artifactIdstream/artifactId
+version2.7.0/version
+exclusions
+exclusion
+groupIdit.unimi.dsi/groupId
+artifactIdfastutil/artifactId
--- End diff --

We have so far refrained from including fastutil, as it is a huge 
dependency, and one usually needs only a minimal subset of the functionality. 
Can you elaborate what this is needed for?


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499809#comment-14499809
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-93984865
  
I think this is nice work.

It would be easier to add and maintain this, if it were not hardwired into 
the core flink classes, but if we could make this like a library. This would be 
a good example for the `flink-contrib` project.

The changes for this should be rather small: It would boil down to not 
adding the specialized methods for the `OperatorStatistics` to the 
RuntimeEnvironment and the `JobExecutionResult`, but to add a util class that 
converts an operator statistics accumulator into the OperatorStatistics object.

I general, I am a bit more comfortable with adding stuff as libraries than 
to the core API. A lot of requests and features are coming in, if we all add 
them to the core API, it will easily become unmaintainable. Staging something 
first as a library is pretty uncritical. We can always add it to the API later, 
if it becomes stable and heavily used.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499815#comment-14499815
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28593066
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
+artifactIdstream/artifactId
+version2.7.0/version
+exclusions
+exclusion
+groupIdit.unimi.dsi/groupId
+artifactIdfastutil/artifactId
--- End diff --

This is an exclusion of fastutil ;)


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499816#comment-14499816
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user tammymendt commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28593077
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
+artifactIdstream/artifactId
+version2.7.0/version
+exclusions
+exclusion
+groupIdit.unimi.dsi/groupId
+artifactIdfastutil/artifactId
--- End diff --

The library we are using from clearspring implements a series of streaming 
algorithms. We are only using HyperLogLog and CountMinSketch to estimate count 
distinct and heavy hitters respectively. As far as I could see fastutil is used 
only in the implementation of a separate streaming algorithm to track 
histograms (QDigest). Since we are not using this particular algorithm, it 
should be ok to exclude fastutil from the dependency. 


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499835#comment-14499835
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-93987722
  
+1 for Stephans suggestion to make it a library in flink-contrib.


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499833#comment-14499833
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28593902
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
+artifactIdstream/artifactId
+version2.7.0/version
+exclusions
+exclusion
+groupIdit.unimi.dsi/groupId
+artifactIdfastutil/artifactId
--- End diff --

Ignore my comment. Robert is right, I overlooked that it is an exclusion, 
not a dependency...


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-04-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14498283#comment-14498283
 ] 

ASF GitHub Bot commented on FLINK-1297:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28528023
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
--- End diff --

Seems to be ASL 2.0 
(https://github.com/addthis/stream-lib/blob/master/LICENSE.txt) -- good


 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1297) Add support for tracking statistics of intermediate results

2015-01-23 Thread Fridtjof Sander (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14289511#comment-14289511
 ] 

Fridtjof Sander commented on FLINK-1297:


I started to prototype the design for this and pushed my first early result 
into the FLINK-1297 branch in our fork:
https://github.com/stratosphere/flink/tree/FLINK-1297

 Add support for tracking statistics of intermediate results
 ---

 Key: FLINK-1297
 URL: https://issues.apache.org/jira/browse/FLINK-1297
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Runtime
Reporter: Alexander Alexandrov
Assignee: Alexander Alexandrov
 Fix For: 0.9

   Original Estimate: 1,008h
  Remaining Estimate: 1,008h

 One of the major problems related to the optimizer at the moment is the lack 
 of proper statistics.
 With the introduction of staged execution, it is possible to instrument the 
 runtime code with a statistics facility that collects the required 
 information for optimizing the next execution stage.
 I would therefore like to contribute code that can be used to gather basic 
 statistics for the (intermediate) result of dataflows (e.g. min, max, count, 
 count distinct) and make them available to the job manager.
 Before I start, I would like to hear some feedback form the other users.
 In particular, to handle skew (e.g. on grouping) it might be good to have 
 some sort of detailed sketch about the key distribution of an intermediate 
 result. I am not sure whether a simple histogram is the most effective way to 
 go. Maybe somebody would propose another lightweight sketch that provides 
 better accuracy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)