[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138847826
  
After rebasing the PR to the current master, the 
`OperatorStatsAccumulatorTest` is failing. In the original PR (based on a 
master from begin of August) the test is passing. @mxm, you are more familiar 
with Flink's accumulators. Can you have a look? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138895774
  
Yes, I'm on it

On Wed, Sep 9, 2015 at 2:34 PM, Max  wrote:

> Could you update the pull request with the new clone() method? I will
> then merge the pull request if it passes Travis.
>
> —
> Reply to this email directly or view it on GitHub
> .
>



-- 
Tamara Mendt



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138863134
  
Hi @tammymendt. Thanks for the pull request! The accumulators work a little 
bit different now because they are now accumulated on a per-task basis and 
reported to the job manager in regular intervals.

The `clone()` method in `OperatorStatistics` doesn't create a deep copy of 
the object, i.e. some references are reused. That causes problems when merging 
the accumulators because runtime accumulators are modified while merging 
accumulators for sending them to the job manager.

I could make the test pass by a nasty deep copy using Java serialization. 
However, I didn't managed to make a proper copy using the provided interfaces. 
I think you can probably do that faster because you know the code very well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138883419
  
The test I wrote checks if the final accumulator result is the one that is 
expected. If I understand correctly, now accumulator values are sent 
periodically before a task finishes. Do I need to write an additional test to 
make sure that accumulator results before task end are also as expected? If not 
I would say it is ok to merge. Have you written extra tests for other 
accumulators like IntCounter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138852658
  
I can have a look at it this afternoon. I suspect it might be because the
function getAllAccumulators has been deprecated.
On Sep 9, 2015 11:11 AM, "Fabian Hueske"  wrote:

> After rebasing the PR to the current master, the
> OperatorStatsAccumulatorTest is failing. In the original PR (based on a
> master from begin of August) the test is passing. @mxm
> , you are more familiar with Flink's
> accumulators. Can you have a look? Thanks!
>
> —
> Reply to this email directly or view it on GitHub
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138895569
  
Could you update the pull request with the new `clone()` method? I will 
then merge the pull request if it passes Travis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138866235
  
I tried again, this works:

```java
@Override
public OperatorStatistics clone(){
OperatorStatistics clone = new OperatorStatistics(config);
clone.min = min;
clone.max = max;
clone.cardinality = cardinality;

try {
ICardinality copy;
if (countDistinct instanceof LinearCounting) {
copy = new 
LinearCounting(config.getCountDbitmap());
} else if (countDistinct instanceof HyperLogLog) {
copy = new HyperLogLog(config.getCountDlog2m());
} else {
throw new IllegalStateException("Unsupported 
counter.");
}
clone.countDistinct = copy.merge(countDistinct);
} catch (CardinalityMergeException e) {
throw new RuntimeException("Faild to clone 
OperatorStatistics!");
}

try {
HeavyHitter copy;
if (heavyHitter instanceof LossyCounting) {
copy = new 
LossyCounting(config.getHeavyHitterFraction(), config.getHeavyHitterError());
} else if (heavyHitter instanceof CountMinHeavyHitter) {
copy = new 
CountMinHeavyHitter(config.getHeavyHitterFraction(),
config.getHeavyHitterError(),

config.getHeavyHitterConfidence(),
config.getHeavyHitterSeed());
} else {
throw new IllegalStateException("Unsupported 
counter.");
}
copy.merge(heavyHitter);
clone.heavyHitter = copy;
} catch (HeavyHitterMergeException e) {
throw new RuntimeException("Failed to clone 
OperatorStatistics!");
}

return clone;
}
```

Do you think we could merge your pull request with this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138906837
  
I rebased with master and included the new clone() method. I had to include 
extra conditionals that check whether count distinct or heavy hitters are being 
tracked because otherwise the clone method would throw an exception in some 
cases in which it shouldn't. Also, I added some tests to the 
OperatorStatsAccumulatorTest class because I realized that I had not written 
tests for different configurations of the OperatorStatsConfig class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138937490
  
Looks good. Merging this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138939760
  
Thanks @tammymendt for the PR and the last fixes. Sorry that it took us so 
long to merge it :-/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/605


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-09-08 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-138709021
  
Hi @tammymendt, it's been a while again. Sorry!
I'll try to rebase and merge this PR tomorrow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r37192039
  
--- Diff: flink-contrib/flink-operator-stats/pom.xml ---
@@ -0,0 +1,67 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0;
+ xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;
+
+modelVersion4.0.0/modelVersion
+
+parent
+groupIdorg.apache.flink/groupId
+artifactIdflink-contrib-parent/artifactId
+version0.10-SNAPSHOT/version
+relativePath../relativePath
+/parent
+
+artifactIdflink-operator-stats/artifactId
+nameflink-operator-stats/name
+
+packagingjar/packaging
+
+dependencies
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-java/artifactId
+version${project.version}/version
+/dependency
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-core/artifactId
+version${project.version}/version
+/dependency
+dependency
+groupIdorg.apache.flink/groupId
+artifactIdflink-test-utils/artifactId
+version${project.version}/version
+scopetest/scope
+/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
--- End diff --

ASL 2.0 license, that's good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-131840098
  
+1 for merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-05 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127965139
  
Yes, if there is commitment to to maintain the code and not just dump it, 
it is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-05 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127923972
  
I will maintain the code. Though I am not committing other code to Flink, I 
am responsible for what I coded and will not bail as soon as it is merged. I 
think this feature might be useful for other people, but if you prefer not to 
merge it I can close the PR and just leave it on my branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-05 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127937557
  
I think there is little risk in merging it to `flink-contrib`. We made that 
place exactly for this contributions like this one. If we find that the code is 
popular and used by many people, we can also move it.

In general, I am +1 for adding this feature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-04 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127519735
  
Hey! So I've been using and testing this code throughout my master thesis. 
Collecting count distinct makes jobs about 10% slower whereas collecting heavy 
hitters can make a job be 20 to 50% slower (depending on the algorithm and the 
distribution of the data). However this overhead is lower than that of using a 
histogram accumulator (not to mention the histogram might not fit in memory). I 
think it can be a nice addition to the code, specially since it does not affect 
any core components. 

The version that I pushed now uses a bunch of conditionals to check which 
statistic is being collected. I know @fhueske did not really like this. I 
implemented another version which avoids the conditionals by using a different 
class for every type of statistic. I preferred to push this version though, 
since it has been more thoroughly tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-04 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127552585
  
It is a nice addition, I agree.

The question for all code that we add is who can maintain it. Code never 
gets just added and then simply works and can stay. Any code that is added 
needs to be maintained by someone, be that a committer, or by a commitment from 
the contributor.

If we can find a maintainer, than I am find with adding this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-08-04 Thread aalexandrov
Github user aalexandrov commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-127558961
  
@StephanEwen To be honest I'm kind of puzzled and somewhat annoyed that a 
PR that

1) adds a feature that has been on the list for at least 2 years,
2) has been tested and evaluated thoroughly, and
3) has been lying around since April and was rebased at least three times

is still open. 

I was also not aware that there is a strict maintainer policy for 
`flink-contrib` commits. IMHO this is a good place to expose non-critical / 
exploratory / unstable features that might be of benefit for multiple users to 
the project. If these turn out the be useful for enough people, I'm sure there 
will be incentive to maintain them. Otherwise there is always the option to 
deprecate and cleanup recent additions to the contrib prior to the next release 
if nobody picked up on them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-06-10 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-110702228
  
Everything seems to be OK now. Is there still a chance to merge this before 
the upcoming release?

I have pushed two commits. The first is the original version with the bug 
fixes. The second is a re-factored version that makes the code more flexible 
and extensible. The second version has a couple more classes than the first 
though. I have pushed both in separate commits so you can decide which version 
you think is preferable. 

Cheers,

Tamara


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-06-10 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-110715816
  
Hi @tammymendt 

thanks for the update! I'm afraid, we are already in the testing phase for 
the release and don't want to add new features. Since we postponed a few 
important features to the next release, we will push hard to get the next 
release out rather soon.

I'll have a look at the PR in detail after the release stress is over.

Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-06-10 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-110720993
  
Sounds good! Thanks =)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-06-08 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109990676
  
Hey! 
So I solved the NullPointer and as far as I can see it is not bothering any 
more. Given the non-deterministic nature of the algorithms, some tests fail 
sometimes. I am looking into this. I am considering  leaving out tests for the 
accuracy of the algorithms (in the Accumulator test), given that they are 
already programmed in other test classes. This should avoid these algorithm 
accuracy related errors in the OperatorStatisticsAccumulatorTest class.

Also, I have been refactoring this code to avoid using the conditionals 
that check which statistics are being collected in the process and merge 
methods. This is based on a remark that @fhueske made way earlier in the PR. 
Should I try and make a PR for the refactored code, or rather leave this in its 
current state, and wait until it is merged until I make a PR for the refactored 
version?

Cheers!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-06-06 Thread tammymendt
Github user tammymendt commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109537258
  
Ok, I will check this out today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-06-05 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109446516
  
I am merging this for the next version.
Very nice addition, sorry for the delay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-06-05 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-109490490
  
The tests seem to be non-deterministic and fail frequently.

Check out this build: 
https://travis-ci.org/StephanEwen/incubator-flink/jobs/65634990

The tests need to be more stable before we can add this to the codebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-21 Thread tammymendt
Github user tammymendt commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28776656
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
--- End diff --

The problem with processing only every n-th element is that precision would 
be affected, and the algorithms are already estimating as it is. I am planning 
to measure performance overhead but haven't done it so far. 

I have removed unnecessary checks inside the process function. The point 
of them was to allow the user to configure which stats wished to be tracked, by 
means of the OperatorStatisticsConfig class. This way for example an 
OperatorStatisticsAccumulator could 

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-21 Thread tammymendt
Github user tammymendt commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28789034
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-21 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28816129
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-21 Thread aalexandrov
Github user aalexandrov commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28818529
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-21 Thread aalexandrov
Github user aalexandrov commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28799879
  
--- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.operatorstatistics;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorsTest.class);
+
+   private static final String ACCUMULATOR_NAME = op-stats;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() throws Exception {
+
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+
+   JobExecutionResult result = env.execute();
+
+   OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
+   LOG.debug(Global Stats);
+   LOG.debug(globalStats.toString());
+
+   OperatorStatistics merged = null;
+
+   MapString,Object accResults = 
result.getAllAccumulatorResults();
+   for (String accumulatorName:accResults.keySet()){
+   if (accumulatorName.contains(ACCUMULATOR_NAME+-)){
+   OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
+   if (merged == null){
+   merged = localStats.clone();
+   }else {
+   merged.merge(localStats);
+   }
+   LOG.debug(Local Stats:  + accumulatorName);
+   LOG.debug(localStats.toString());
+   }
+   }
+
+   Assert.assertEquals(globalStats.cardinality,999);
+   Assert.assertEquals(globalStats.estimateCountDistinct(),100);
+   Assert.assertTrue(globalStats.getHeavyHitters().size()0  
globalStats.getHeavyHitters().size()=5);
+   Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28739333
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/heavyhitters/IHeavyHitter.java
 ---
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics.heavyhitters;
+
+import java.util.HashMap;
+
+/**
+ * Interface for classes that track heavy hitters. It follows the same 
design as the interfaces from
+ * {@link com.clearspring.analytics.stream}
+ */
+public interface IHeavyHitter {
--- End diff --

Interfaces do not follow a special naming convention in the Flink code base 
such as starting with a capital `I`. Please rename to `HeavyHitter` for 
consistency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28739811
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
--- End diff --

It looks like this method expected to be called for each passing element if 
statistics collection is enabled. This could add significant processing 
overhead. Would it make sense to add an optional skip interval n that processes 
only every n-th element (except from increasing the `cardinality` counter.

Also it makes sense to perform as many checks outside of the function as 
possible. The type of the data elements is known before execution. So it can be 
checked if a data type 

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-20 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-94464757
  
Keeping the accumulators from each task separately is something that we 
want to change anyways in the main system. Then, this would not need special 
casing.

Until then, you should be able to encode the subtask into the accumulator 
name, like taskname_i. Then, the accumulators do not get combined.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-18 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28642379
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.OperatorStatsAccumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statistics.OperatorStatistics;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final String ACCUMULATOR_NAME = op-stats-accumulator;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() {
+
+   try {
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   
input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   
input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, 
input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+   
+   JobExecutionResult result = env.execute();
+   System.out.println(Accumulator results:);
+
+   OperatorStatistics globalStats = 
result.getOperatorStatisticsResult(ACCUMULATOR_NAME);
+   System.out.println(Global Stats);
+   System.out.println(globalStats.toString());
+
+   OperatorStatistics[] localStats = 
result.getLocalOperatorStatisticsResults(ACCUMULATOR_NAME);
+   System.out.println(Local stats: 0);
+   System.out.println(localStats[0].toString());
+
+   OperatorStatistics merged = localStats[0].clone();
+   for (int i=1;ilocalStats.length;i++) {
+   merged.merge(localStats[i]);
+   System.out.println(Local stats: +i);
+   System.out.println(localStats[i].toString());
+   }
+   
Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   
Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }
+   catch (Exception e) {
--- End diff --

Tests fail anyways if an Exception is thrown. No need for the catch = fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-93938006
  
I made some comments to the PR.
You can update it by pushing into the same branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575850
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/statistics/heavyhitters/LossyCountingTest.java
 ---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics.heavyhitters;
+
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertTrue;
+
+/*
+* Test the structure implemented for Lossy Counting
+*/
+
+public class LossyCountingTest {
+
+   static final double fraction = 0.05;
+   static final double error = 0.005;
+   static final int seed = 7362181;
+   static final Random r = new Random(seed);
+
+   @Test
+   public void testAccuracy() {
+
+   int numItems = 100;
+   long frequency = (int)Math.ceil(numItems* fraction);
+   long minFrequency = (int)Math.ceil(numItems* (fraction-error));
+
+   int[] xs = new int[numItems];
+   int maxScale = 20;
+
+   for (int i = 0; i  numItems; i++) {
+   double p = r.nextDouble();
+   if (p0.2){
+   xs[i] = r.nextInt(5);
+   }else {
+   int scale = r.nextInt(maxScale);
+   xs[i] = r.nextInt(1  scale);
+   }
+   }
+
+   LossyCounting lossyCounting = new LossyCounting(fraction,error);
+
+   for (int x : xs) {
+   lossyCounting.addObject(x);
+   }
+
+   long[] actualFreq = new long[1  maxScale];
+   for (int x : xs) {
+   actualFreq[x]++;
+   }
+
+   System.out.println(Size of heavy hitters: 
+lossyCounting.heavyHitters.size());
--- End diff --

can you use regular logging instead of system.outs for these messages?
We have different logging configurations for travis, local and IDE but we 
can not control system.outs with our logging settings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575727
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/OperatorStatsAccumulatorsTest.java
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.OperatorStatsAccumulator;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.statistics.OperatorStatistics;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Random;
+
+public class OperatorStatsAccumulatorsTest extends AbstractTestBase {
+
+   private static final String ACCUMULATOR_NAME = op-stats-accumulator;
+
+   public OperatorStatsAccumulatorsTest(){
+   super(new Configuration());
+   }
+
+   @Test
+   public void testAccumulator() {
+
+   try {
+   String input = ;
+
+   Random rand = new Random();
+
+   for (int i = 1; i  1000; i++) {
+   if(rand.nextDouble()0.2){
+   
input+=String.valueOf(rand.nextInt(5))+\n;
+   }else{
+   
input+=String.valueOf(rand.nextInt(100))+\n;
+   }
+   }
+
+   String inputFile = createTempFile(datapoints.txt, 
input);
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   env.readTextFile(inputFile).
+   flatMap(new StringToInt()).
+   output(new 
DiscardingOutputFormatTuple1Integer());
+   
+   JobExecutionResult result = env.execute();
+   System.out.println(Accumulator results:);
+
+   OperatorStatistics globalStats = 
result.getOperatorStatisticsResult(ACCUMULATOR_NAME);
+   System.out.println(Global Stats);
+   System.out.println(globalStats.toString());
+
+   OperatorStatistics[] localStats = 
result.getLocalOperatorStatisticsResults(ACCUMULATOR_NAME);
+   System.out.println(Local stats: 0);
+   System.out.println(localStats[0].toString());
+
+   OperatorStatistics merged = localStats[0].clone();
+   for (int i=1;ilocalStats.length;i++) {
+   merged.merge(localStats[i]);
+   System.out.println(Local stats: +i);
+   System.out.println(localStats[i].toString());
+   }
+   
Assert.assertEquals(merged.getMin(),globalStats.getMin());
+   
Assert.assertEquals(merged.getMax(),globalStats.getMax());
+   
Assert.assertEquals(merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
+   
Assert.assertEquals(merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
+
+   }
+   catch (Exception e) {
--- End diff --

I would fail the test in case of an exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact 

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575964
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
+   if (tupleObject instanceof Comparable) {
+   if (config.collectMin  (min == null || ((Comparable) 
tupleObject).compareTo(min)  0)) {
+   min = tupleObject;
+   }
+   if (config.collectMax  (max == null || ((Comparable) 
tupleObject).compareTo(max)  0)) {
+   max = tupleObject;
+   }
+   }
+   if (config.collectCountDistinct){
+   

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28575968
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/statistics/OperatorStatistics.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statistics;
+
+import 
com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.clearspring.analytics.stream.cardinality.ICardinality;
+import com.clearspring.analytics.stream.cardinality.LinearCounting;
+import org.apache.flink.statistics.heavyhitters.IHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.LossyCounting;
+import org.apache.flink.statistics.heavyhitters.CountMinHeavyHitter;
+import org.apache.flink.statistics.heavyhitters.HeavyHitterMergeException;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Data structure that encapsulates statistical information of data that 
has only been processed by one pass
+ * This statistical information is meant to help determine the 
distribution of the data that has been processed
+ * in an operator so that we can determine if it is necessary to 
repartition the data
+ *
+ * The statistics to be gathered are configurable and represented by a 
{@link OperatorStatisticsConfig} object.
+ *
+ * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
+ * structure holding the heavy hitters along with their frequency.
+ *
+ */
+public class OperatorStatistics implements Serializable {
+
+   OperatorStatisticsConfig config;
+
+   Object min;
+   Object max;
+   ICardinality countDistinct;
+   IHeavyHitter heavyHitter;
+   long cardinality = 0;
+
+   public OperatorStatistics(OperatorStatisticsConfig config) {
+   this.config = config;
+   if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
+   countDistinct = new 
LinearCounting(OperatorStatisticsConfig.COUNTD_BITMAP_SIZE);
+   }
+   
if(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
+   countDistinct = new 
HyperLogLog(OperatorStatisticsConfig.COUNTD_LOG2M);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
+   heavyHitter =
+   new 
LossyCounting(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION, 
OperatorStatisticsConfig.HEAVY_HITTER_ERROR);
+   }
+   if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)){
+   heavyHitter =
+   new 
CountMinHeavyHitter(OperatorStatisticsConfig.HEAVY_HITTER_FRACTION,
+   
OperatorStatisticsConfig.HEAVY_HITTER_ERROR,
+   
OperatorStatisticsConfig.HEAVY_HITTER_CONFIDENCE,
+   
OperatorStatisticsConfig.HEAVY_HITTER_SEED);
+   }
+   }
+
+   public void process(Object tupleObject){
+   if (tupleObject instanceof Comparable) {
+   if (config.collectMin  (min == null || ((Comparable) 
tupleObject).compareTo(min)  0)) {
+   min = tupleObject;
+   }
+   if (config.collectMax  (max == null || ((Comparable) 
tupleObject).compareTo(max)  0)) {
+   max = tupleObject;
+   }
+   }
+   if (config.collectCountDistinct){
+   

[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-93984865
  
I think this is nice work.

It would be easier to add and maintain this, if it were not hardwired into 
the core flink classes, but if we could make this like a library. This would be 
a good example for the `flink-contrib` project.

The changes for this should be rather small: It would boil down to not 
adding the specialized methods for the `OperatorStatistics` to the 
RuntimeEnvironment and the `JobExecutionResult`, but to add a util class that 
converts an operator statistics accumulator into the OperatorStatistics object.

I general, I am a bit more comfortable with adding stuff as libraries than 
to the core API. A lot of requests and features are coming in, if we all add 
them to the core API, it will easily become unmaintainable. Staging something 
first as a library is pretty uncritical. We can always add it to the API later, 
if it becomes stable and heavily used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28593902
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
+artifactIdstream/artifactId
+version2.7.0/version
+exclusions
+exclusion
+groupIdit.unimi.dsi/groupId
+artifactIdfastutil/artifactId
--- End diff --

Ignore my comment. Robert is right, I overlooked that it is an exclusion, 
not a dependency...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28593066
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
+artifactIdstream/artifactId
+version2.7.0/version
+exclusions
+exclusion
+groupIdit.unimi.dsi/groupId
+artifactIdfastutil/artifactId
--- End diff --

This is an exclusion of fastutil ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-17 Thread tammymendt
Github user tammymendt commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28593077
  
--- Diff: flink-core/pom.xml ---
@@ -63,6 +63,19 @@ under the License.
artifactIdguava/artifactId
version${guava.version}/version
/dependency
+
+dependency
+groupIdcom.clearspring.analytics/groupId
+artifactIdstream/artifactId
+version2.7.0/version
+exclusions
+exclusion
+groupIdit.unimi.dsi/groupId
+artifactIdfastutil/artifactId
--- End diff --

The library we are using from clearspring implements a series of streaming 
algorithms. We are only using HyperLogLog and CountMinSketch to estimate count 
distinct and heavy hitters respectively. As far as I could see fastutil is used 
only in the implementation of a separate streaming algorithm to track 
histograms (QDigest). Since we are not using this particular algorithm, it 
should be ok to exclude fastutil from the dependency. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-16 Thread tammymendt
GitHub user tammymendt opened a pull request:

https://github.com/apache/flink/pull/605

[FLINK-1297] Added OperatorStatsAccumulator for tracking operator related 
stats

The accumulator tracks min and max values, and estimates for count distinct 
and heavy hitters.

The count distinct algorithms are Linear Counting and HyperLogLog, both 
from an imported library (clearspring).

The heavy hitters algorithms are Lossy counting (Manku et.al 2002) and 
Count Min Sketch (Cormode 2005).

The heavy hitters algorithms are implemented in the statistics package in 
flink-core.

The accumulator currently only uses Linear Counting as default for count 
distinct and Lossy Counting as default for heavy hitters. 

The accumulator does not only track the globally merged value the way the 
other accumulators do. It additionally tracks an array of local statistics 
which have been collected at each subtask of a task. It does this by wrapping 
an extra class called OperatorStatisticsResult which holds the local and global 
accumulated results. The idea of this is to be able to track statistics of data 
processed in subtasks, so that they can be used to reason about partitioning 
strategies.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tammymendt/flink FLINK-1297-v2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/605.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #605


commit f365ccd92b513f10d0ba2d1a84b210d36060947c
Author: Tamara Mendt tammyme...@gmail.com
Date:   2015-04-16T09:25:16Z

[FLINK-1297] Added an accumulator called OperatorStatsAccumulator capable 
of tracking min, max and estimates for count distinct and heavy hitters.

The count distinct algorithms are Linear Counting and HyperLogLog, both 
from an imported library from clearspring.

The heavy hitters algorithms are Lossy counting (Manku et.al 2002) and one 
based on Count Min Sketch (Cormode 2005).

The heavy hitters algorithms are implemented in the statistics package in 
flink-core.

The accumulator does not only track the globally merged value, but tracks 
an array of local statistics which have been collected at each subtask of a 
task. It does this using an extra class called OperatorStatisticsResult




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-16 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/605#issuecomment-93730879
  
Nice, statistics collection :-)

I'll have a look at this PR. 
It might take a few days as I am traveling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1297] Added OperatorStatsAccumulator fo...

2015-04-16 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/605#discussion_r28528360
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/accumulators/OperatorStatsAccumulator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.accumulators;
+
+import org.apache.flink.statistics.OperatorStatistics;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * This accumulator wraps the class {@link 
org.apache.flink.api.common.accumulators.OperatorStatisticsResult} to track
+ * estimated values for count distinct and heavy hitters. It has lower 
memory requirements than
+ * {@link org.apache.flink.api.common.accumulators.Histogram}.
+ * Our goal in accumulating stats per operator is to not only track global 
stats, but also know the local stats of the
+ * subtasks of a task. For this purpose, {@link 
org.apache.flink.api.common.accumulators.OperatorStatisticsResult}
+ * encapsulates one global accumulator and an array of local accumulators.
+ *
+ * The constructor of the class receives a subTaskIndex, and the total 
number of subtasks.
+ * Both parameters are found in the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} from where 
the constructor is called.
+ */
+public class OperatorStatsAccumulator implements AccumulatorObject, 
OperatorStatisticsResult {
+
+
+   private OperatorStatisticsResult localValue;
+
+   public OperatorStatsAccumulator(int subTaskIndex, int numSubtasks){
+   localValue = new OperatorStatisticsResult(subTaskIndex, 
numSubtasks);
+   }
+
+   @Override
+   public void add(Object value) {
+   localValue.add(value);
+   }
+
+   @Override
+   public OperatorStatisticsResult getLocalValue() {
+   return localValue;
+   }
+
+   @Override
+   public void resetLocal() {
+   localValue.resetLocal();
+   }
+
+   @Override
+   public void merge(AccumulatorObject, OperatorStatisticsResult other) {
+   localValue.merge(other.getLocalValue());
+   }
+
+   //todo this has not been tested
+   public void write(ObjectOutputStream out) throws IOException {
+   out.writeObject(localValue.getLocal());
+
+   }
+
+   //todo this has not been tested
+   public void read(ObjectInputStream in) throws IOException {
+   try {
+   localValue.setLocal((OperatorStatistics) 
in.readObject());
+   } catch (ClassNotFoundException e) {
+   e.printStackTrace();
--- End diff --

Just doing e.printStackTrace() will make it extremely hard to debug the 
error. You'll find a stacktrace without any message in the stderr file ... The 
flink programm will continue running in an erroneous state.
I would recommend re-throwing the exception as a runtime exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---