Repository: flink
Updated Branches:
  refs/heads/master d443d6b02 -> d5b97b07b


[FLINK-4891] Remove flink-contrib/flink-operator-stats module


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5b97b07
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5b97b07
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5b97b07

Branch: refs/heads/master
Commit: d5b97b07b46700bb26e1405a3715206bc5db9df0
Parents: d443d6b
Author: Ufuk Celebi <[email protected]>
Authored: Mon Oct 24 11:08:08 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Oct 24 11:08:08 2016 +0200

----------------------------------------------------------------------
 flink-contrib/flink-operator-stats/pom.xml      |  75 -----
 .../operatorstatistics/OperatorStatistics.java  | 269 ------------------
 .../OperatorStatisticsAccumulator.java          |  66 -----
 .../OperatorStatisticsConfig.java               | 161 -----------
 .../heavyhitters/CountMinHeavyHitter.java       | 172 ------------
 .../heavyhitters/HeavyHitter.java               |  37 ---
 .../heavyhitters/HeavyHitterMergeException.java |  32 ---
 .../heavyhitters/LossyCounting.java             | 140 ---------
 .../OperatorStatsAccumulatorTest.java           | 281 -------------------
 .../heavyhitters/CountMinHeavyHitterTest.java   | 115 --------
 .../heavyhitters/LossyCountingTest.java         | 131 ---------
 flink-contrib/pom.xml                           |   1 -
 12 files changed, 1480 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-operator-stats/pom.xml 
b/flink-contrib/flink-operator-stats/pom.xml
deleted file mode 100644
index 6fa6ecf..0000000
--- a/flink-contrib/flink-operator-stats/pom.xml
+++ /dev/null
@@ -1,75 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-contrib</artifactId>
-        <version>1.2-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-
-    <artifactId>flink-operator-stats_2.10</artifactId>
-    <name>flink-operator-stats</name>
-
-    <packaging>jar</packaging>
-
-    <dependencies>
-
-        <!-- core dependencies -->
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.clearspring.analytics</groupId>
-            <artifactId>stream</artifactId>
-            <version>2.7.0</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>it.unimi.dsi</groupId>
-                    <artifactId>fastutil</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <!-- test dependencies -->
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils_2.10</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-    </dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java
 
b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java
deleted file mode 100644
index 794ab57..0000000
--- 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics;
-
-import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
-import com.clearspring.analytics.stream.cardinality.HyperLogLog;
-import com.clearspring.analytics.stream.cardinality.ICardinality;
-import com.clearspring.analytics.stream.cardinality.LinearCounting;
-import org.apache.flink.contrib.operatorstatistics.heavyhitters.HeavyHitter;
-import org.apache.flink.contrib.operatorstatistics.heavyhitters.LossyCounting;
-import 
org.apache.flink.contrib.operatorstatistics.heavyhitters.CountMinHeavyHitter;
-import 
org.apache.flink.contrib.operatorstatistics.heavyhitters.HeavyHitterMergeException;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Data structure that encapsulates statistical information of data that has 
only been processed by one pass
- * This statistical information is meant to help determine the distribution of 
the data that has been processed
- * in an operator so that we can determine if it is necessary to repartition 
the data
- *
- * The statistics to be gathered are configurable and represented by a {@link 
OperatorStatisticsConfig} object.
- *
- * The information encapsulated in this class is min, max, a structure 
enabling estimation of count distinct and a
- * structure holding the heavy hitters along with their frequency.
- *
- */
-public class OperatorStatistics implements Serializable {
-
-       OperatorStatisticsConfig config;
-
-       transient Object min = null;
-       transient Object max = null;
-       long cardinality = 0;
-       transient ICardinality countDistinct;
-       transient HeavyHitter heavyHitter;
-
-       public OperatorStatistics(OperatorStatisticsConfig config) {
-               this.config = config;
-               if (config.collectCountDistinct) {
-                       if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING))
 {
-                               countDistinct = new 
LinearCounting(config.getCountDbitmap());
-                       }
-                       if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG))
 {
-                               countDistinct = new 
HyperLogLog(config.getCountDlog2m());
-                       }
-               }
-               if (config.collectHeavyHitters) {
-                       if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING))
 {
-                               heavyHitter =
-                                               new 
LossyCounting(config.getHeavyHitterFraction(), config.getHeavyHitterError());
-                       }
-                       if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH))
 {
-                               heavyHitter =
-                                               new 
CountMinHeavyHitter(config.getHeavyHitterFraction(),
-                                                               
config.getHeavyHitterError(),
-                                                               
config.getHeavyHitterConfidence(),
-                                                               
config.getHeavyHitterSeed());
-                       }
-               }
-       }
-
-       public void process(Object tupleObject){
-               if (tupleObject instanceof Comparable) {
-                       if (config.collectMin && (min == null || ((Comparable) 
tupleObject).compareTo(min) < 0)) {
-                                       min = tupleObject;
-                       }
-                       if (config.collectMax && (max == null || ((Comparable) 
tupleObject).compareTo(max) > 0)) {
-                                       max = tupleObject;
-                       }
-               }
-               if (config.collectCountDistinct){
-                       countDistinct.offer(tupleObject);
-               }
-               if (config.collectHeavyHitters){
-                       heavyHitter.addObject(tupleObject);
-               }
-               cardinality+=1;
-       }
-
-       public void merge(OperatorStatistics other) throws RuntimeException {
-               if (config.collectMin & other.min!=null && (min==null || 
((Comparable) other.min).compareTo(min) < 0)) {
-                       this.min = other.min;
-               }
-
-               if (config.collectMax && other.max!=null && (max == null || 
((Comparable) other.max).compareTo(max) > 0)) {
-                               this.max = other.max;
-               }
-               if (config.collectCountDistinct){
-                       try {
-                               ICardinality mergedCountDistinct = 
this.countDistinct.merge(new 
ICardinality[]{this.countDistinct,other.countDistinct});
-                               this.countDistinct = mergedCountDistinct;
-                       } catch (CardinalityMergeException e) {
-                               throw new RuntimeException("Error merging count 
distinct structures",e);
-                       }
-               }
-               if (config.collectHeavyHitters){
-                       try {
-                               this.heavyHitter.merge(other.heavyHitter);
-                       } catch (HeavyHitterMergeException e) {
-                               throw new RuntimeException("Error merging heavy 
hitter structures",e);
-                       }
-               }
-               this.cardinality+=other.cardinality;
-       }
-
-       public Object getMin() {
-               return min;
-       }
-
-       public Object getMax() {
-               return max;
-       }
-
-       public long estimateCountDistinct(){
-               return countDistinct.cardinality();
-       }
-
-       public Map<Object,Long> getHeavyHitters(){
-               return heavyHitter.getHeavyHitters();
-       }
-
-       @Override
-       public String toString(){
-               String out = "\ntotal cardinality: "+this.cardinality;
-               if (config.collectMax) {
-                       out += "\nmax: " + this.max;
-               }
-               if (config.collectMin){
-                       out+="\nmin: "+this.min;
-               }
-               if (config.collectCountDistinct){
-                       if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){
-                               out+="\ncount distinct 
estimate("+config.countDistinctAlgorithm+
-                                               
"["+config.getCountDlog2m()+"]): "+
-                                               
this.countDistinct.cardinality();
-                       }else{
-                               out+="\ncount distinct 
estimate("+config.countDistinctAlgorithm+
-                                               
"["+config.getCountDbitmap()+"]): "+
-                                               
this.countDistinct.cardinality();
-                       }
-               }
-               if (config.collectHeavyHitters){
-                       if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING))
 {
-                               out += "\nheavy hitters (" + 
config.heavyHitterAlgorithm +
-                                               "[" + 
config.getHeavyHitterFraction() + ", "
-                                                       + 
config.getHeavyHitterError() + "]):";
-                               out += "\n" + heavyHitter.toString();
-                       }else {
-                               out += "\nheavy hitters (" + 
config.heavyHitterAlgorithm +
-                                               "[" + 
config.getHeavyHitterFraction() + ", "
-                                                       + 
config.getHeavyHitterError() + ", "
-                                                       + 
config.getHeavyHitterConfidence() + "]):";
-                               out += "\n" + heavyHitter.toString();
-                       }
-               }
-               return out;
-       }
-
-       @Override
-       public OperatorStatistics clone(){
-               OperatorStatistics clone = new OperatorStatistics(config);
-               clone.min = min;
-               clone.max = max;
-               clone.cardinality = cardinality;
-
-               try {
-                       if (config.collectCountDistinct) {
-                               ICardinality copy;
-                               if (countDistinct instanceof LinearCounting) {
-                                       copy = new 
LinearCounting(config.getCountDbitmap());
-                               } else if (countDistinct instanceof 
HyperLogLog) {
-                                       copy = new 
HyperLogLog(config.getCountDlog2m());
-                               } else {
-                                       throw new 
IllegalStateException("Unsupported count distinct counter.");
-                               }
-                               clone.countDistinct = copy.merge(countDistinct);
-                       }
-               } catch (CardinalityMergeException e) {
-                       throw new RuntimeException("Faild to clone 
OperatorStatistics!");
-               }
-
-               try {
-                       if (config.collectHeavyHitters) {
-                               HeavyHitter copy;
-                               if (heavyHitter instanceof LossyCounting) {
-                                       copy = new 
LossyCounting(config.getHeavyHitterFraction(), config.getHeavyHitterError());
-                               } else if (heavyHitter instanceof 
CountMinHeavyHitter) {
-                                       copy = new 
CountMinHeavyHitter(config.getHeavyHitterFraction(),
-                                                       
config.getHeavyHitterError(),
-                                                       
config.getHeavyHitterConfidence(),
-                                                       
config.getHeavyHitterSeed());
-                               } else {
-                                       throw new 
IllegalStateException("Unsupported heavy hitter counter.");
-                               }
-                               copy.merge(heavyHitter);
-                               clone.heavyHitter = copy;
-                       }
-               } catch (HeavyHitterMergeException e) {
-                       throw new RuntimeException("Failed to clone 
OperatorStatistics!");
-               }
-
-               return clone;
-       }
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               out.defaultWriteObject();
-               if (config.collectMin){
-                       out.writeObject(min);
-               }
-               if (config.collectMax){
-                       out.writeObject(max);
-               }
-               if (config.collectCountDistinct){
-                       if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)){
-                               out.writeObject(countDistinct.getBytes());
-                       }else{
-                               out.writeObject(countDistinct);
-                       }
-               }
-               if (config.collectHeavyHitters){
-                       out.writeObject(heavyHitter);
-               }
-       }
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-               if (config.collectMin){
-                       min = in.readObject();
-               }
-               if (config.collectMax){
-                       max = in.readObject();
-               }
-               if (config.collectCountDistinct){
-                       if 
(config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)){
-                               countDistinct = new 
LinearCounting((byte[])in.readObject());
-                       }else{
-                               countDistinct = (HyperLogLog)in.readObject();
-                       }
-               }
-               if (config.collectHeavyHitters) {
-                       if 
(config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){
-                               heavyHitter = (LossyCounting)in.readObject();
-                       }else{
-                               heavyHitter = 
(CountMinHeavyHitter)in.readObject();
-                       }
-               }
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java
 
b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java
deleted file mode 100644
index 857a403..0000000
--- 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.operatorstatistics;
-
-import org.apache.flink.api.common.accumulators.Accumulator;
-
-/**
- * This accumulator wraps the class {@link OperatorStatistics} to track
- * estimated values for count distinct and heavy hitters.
- *
- */
-public class OperatorStatisticsAccumulator implements Accumulator<Object, 
OperatorStatistics> {
-
-       private OperatorStatistics local;
-
-       public OperatorStatisticsAccumulator(){
-               local = new OperatorStatistics(new OperatorStatisticsConfig());
-       }
-
-       public OperatorStatisticsAccumulator(OperatorStatisticsConfig config){
-               local = new OperatorStatistics(config);
-       }
-
-       @Override
-       public void add(Object value) {
-               local.process(value);
-       }
-
-       @Override
-       public OperatorStatistics getLocalValue() {
-               return local;
-       }
-
-       @Override
-       public void resetLocal() {
-               local = new OperatorStatistics(new OperatorStatisticsConfig());
-       }
-
-       @Override
-       public void merge(Accumulator<Object, OperatorStatistics> other) {
-               local.merge(other.getLocalValue());
-       }
-
-       @Override
-       public Accumulator<Object, OperatorStatistics> clone() {
-               OperatorStatisticsAccumulator clone = new 
OperatorStatisticsAccumulator();
-               clone.local = this.local.clone();
-               return clone;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java
 
b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java
deleted file mode 100644
index d12b01d..0000000
--- 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics;
-
-import java.io.Serializable;
-
-/**
- * Configures the behavior of an {@link OperatorStatistics} instance.
- *
- * Sets the parameters that determine the accuracy of the count distinct and 
heavy hitter sketches
- *
- * Defines the statistics to be collected. A boolean field indicates whether a 
given statistic should be collected or not
- *
- * Encapsulates an enum indicating which sketch should be used for count 
distinct and another indicating which sketch
- * should be used for detecting heavy hitters.
- */
-
-public class OperatorStatisticsConfig implements Serializable {
-
-       private int countDbitmap = 1000000;
-       private int countDlog2m = 10;
-
-       private int heavyHitterSeed = 121311332;
-       private double heavyHitterConfidence = 0.99;
-       private double heavyHitterFraction = 0.05;
-       private double heavyHitterError = 0.0005;
-
-       public boolean collectMin;
-       public boolean collectMax;
-       public boolean collectCountDistinct;
-       public boolean collectHeavyHitters;
-       public CountDistinctAlgorithm countDistinctAlgorithm;
-       public HeavyHitterAlgorithm heavyHitterAlgorithm;
-
-       public OperatorStatisticsConfig(){
-               this.collectMin = true;
-               this.collectMax = true;
-               this.collectCountDistinct = true;
-               this.collectHeavyHitters = true;
-               this.countDistinctAlgorithm = 
CountDistinctAlgorithm.HYPERLOGLOG;
-               this.heavyHitterAlgorithm = HeavyHitterAlgorithm.LOSSY_COUNTING;
-       }
-
-       public OperatorStatisticsConfig(boolean collect){
-               this.collectMin = collect;
-               this.collectMax = collect;
-               this.collectCountDistinct = collect;
-               this.collectHeavyHitters = collect;
-               this.countDistinctAlgorithm = 
CountDistinctAlgorithm.HYPERLOGLOG; //Defaut algorithm
-               this.heavyHitterAlgorithm = 
HeavyHitterAlgorithm.LOSSY_COUNTING; //Default algorithm
-       }
-
-       public OperatorStatisticsConfig(CountDistinctAlgorithm countDistinct, 
HeavyHitterAlgorithm heavyHitter) {
-               this.collectMin = true;
-               this.collectMax = true;
-               this.collectCountDistinct = true;
-               this.collectHeavyHitters = true;
-               this.countDistinctAlgorithm = countDistinct;
-               this.heavyHitterAlgorithm = heavyHitter;
-       }
-
-       public enum CountDistinctAlgorithm {
-
-               LINEAR_COUNTING,
-               HYPERLOGLOG;
-
-       }
-
-       public enum HeavyHitterAlgorithm{
-               LOSSY_COUNTING,
-               COUNT_MIN_SKETCH;
-       }
-
-       public void setCountDbitmap(int countDbitmap) {
-               this.countDbitmap = countDbitmap;
-       }
-
-       public void setCountDlog2m(int countDlog2m) {
-               this.countDlog2m = countDlog2m;
-       }
-
-       public void setHeavyHitterConfidence(double heavyHitterConfidence) {
-               this.heavyHitterConfidence = heavyHitterConfidence;
-       }
-
-       public void setHeavyHitterSeed(int heavyHitterSeed) {
-               this.heavyHitterSeed = heavyHitterSeed;
-       }
-
-       public void setHeavyHitterFraction(double heavyHitterFraction) {
-               this.heavyHitterFraction = heavyHitterFraction;
-       }
-
-       public void setHeavyHitterError(double heavyHitterError) {
-               this.heavyHitterError = heavyHitterError;
-       }
-
-       public void setCollectMin(boolean collectMin) {
-               this.collectMin = collectMin;
-       }
-
-       public void setCollectMax(boolean collectMax) {
-               this.collectMax = collectMax;
-       }
-
-       public void setCollectCountDistinct(boolean collectCountDistinct) {
-               this.collectCountDistinct = collectCountDistinct;
-       }
-
-       public void setCollectHeavyHitters(boolean collectHeavyHitters) {
-               this.collectHeavyHitters = collectHeavyHitters;
-       }
-
-       public void setCountDistinctAlgorithm(CountDistinctAlgorithm 
countDistinctAlgorithm) {
-               this.countDistinctAlgorithm = countDistinctAlgorithm;
-       }
-
-       public void setHeavyHitterAlgorithm(HeavyHitterAlgorithm 
heavyHitterAlgorithm) {
-               this.heavyHitterAlgorithm = heavyHitterAlgorithm;
-       }
-
-       public int getCountDbitmap() {
-               return countDbitmap;
-       }
-
-       public int getCountDlog2m() {
-               return countDlog2m;
-       }
-
-       public int getHeavyHitterSeed() {
-               return heavyHitterSeed;
-       }
-
-       public double getHeavyHitterConfidence() {
-               return heavyHitterConfidence;
-       }
-
-       public double getHeavyHitterFraction() {
-               return heavyHitterFraction;
-       }
-
-       public double getHeavyHitterError() {
-               return heavyHitterError;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java
 
b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java
deleted file mode 100644
index 30a6c1b..0000000
--- 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics.heavyhitters;
-
-import com.clearspring.analytics.hash.MurmurHash;
-import com.clearspring.analytics.stream.frequency.CountMinSketch;
-
-import java.io.Serializable;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/*
- * This class tracks heavy hitters using the {@link 
com.clearspring.analytics.stream.frequency.CountMinSketch} structure
- * to estimate frequencies.
- */
-public class CountMinHeavyHitter implements HeavyHitter, Serializable {
-
-       private transient CountMinSketch countMinSketch;
-       private HashMap<Object,Long> heavyHitters;
-       private double fraction;
-       private double error;
-       private long cardinality;
-
-       public CountMinHeavyHitter(double fraction, double error, double 
confidence, int seed){
-               this.countMinSketch = new CountMinSketch(error,confidence,seed);
-               this.error = error;
-               this.cardinality = 0;
-               this.fraction = fraction;
-               this.heavyHitters = new HashMap<Object,Long>();
-       }
-
-       public CountMinHeavyHitter(CountMinSketch countMinSketch, double 
fraction){
-               this.countMinSketch = countMinSketch;
-               this.error = countMinSketch.getRelativeError();
-               this.cardinality = 0;
-               this.fraction = fraction;
-               this.heavyHitters = new HashMap<Object,Long>();
-       }
-
-       @Override
-       public void addObject(Object o) {
-               cardinality +=1;
-               if (o instanceof Long){
-                       countMinSketch.add((Long)o, 1);
-               }else{
-                       countMinSketch.add(MurmurHash.hash(o), 1);
-               }
-               updateHeavyHitters(o);
-       }
-
-       private void updateHeavyHitters(Object item){
-               long minFrequency = (long)Math.ceil(cardinality * fraction);
-               long estimateCount = estimateCount(item);
-
-               if (estimateCount >= minFrequency){
-                       heavyHitters.put(item, estimateCount);
-               }
-
-               if (cardinality%(long)Math.ceil(1/error)==0) {
-                       removeNonFrequent(minFrequency);
-               }
-       }
-
-       private void removeNonFrequent(long minFrequency){
-               Iterator it = heavyHitters.entrySet().iterator();
-               while (it.hasNext()) {
-                       if (((Map.Entry<Object,Long>)it.next()).getValue() < 
minFrequency) {
-                               it.remove();
-                       }
-               }
-       }
-
-       public long estimateCount(Object item){
-               if (item instanceof Long){
-                       return countMinSketch.estimateCount((Long)item);
-               }else{
-                       return 
countMinSketch.estimateCount(MurmurHash.hash(item));
-               }
-       }
-
-       public void merge(HeavyHitter toMerge) throws 
CMHeavyHitterMergeException {
-
-               try {
-                       CountMinHeavyHitter cmToMerge = 
(CountMinHeavyHitter)toMerge;
-                       if (this.fraction != cmToMerge.fraction) {
-                               throw new CMHeavyHitterMergeException("The 
fraction for both heavy hitters must be the same");
-                       }
-
-                       this.countMinSketch = 
CountMinSketch.merge(this.countMinSketch, cmToMerge.countMinSketch);
-
-                       HashMap<Object,Long> mergedHeavyHitters = new 
HashMap<Object, Long>();
-
-                       for (Map.Entry<Object, Long> entry : 
this.heavyHitters.entrySet()) {
-                               mergedHeavyHitters.put(entry.getKey(), 
estimateCount(entry.getKey()));
-                       }
-
-                       for (Map.Entry<Object, Long> entry : 
cmToMerge.heavyHitters.entrySet()) {
-                               if 
(!mergedHeavyHitters.containsKey(entry.getKey())) {
-                                       mergedHeavyHitters.put(entry.getKey(), 
estimateCount(entry.getKey()));
-                               }
-                       }
-                       this.heavyHitters = mergedHeavyHitters;
-                       cardinality+=cmToMerge.cardinality;
-
-               }catch (ClassCastException ex){
-                       throw new CMHeavyHitterMergeException("Both heavy 
hitter objects must belong to the same class", ex);
-               }catch (Exception ex){
-                       throw new CMHeavyHitterMergeException("Cannot merge 
count min sketches: ", ex);
-               }
-       }
-
-
-       @Override
-       public HashMap<Object,Long> getHeavyHitters() {
-               long minFrequency = (long)Math.ceil(cardinality * fraction);
-               removeNonFrequent(minFrequency);
-               return heavyHitters;
-       }
-
-       protected static class CMHeavyHitterMergeException extends 
HeavyHitterMergeException {
-
-               public CMHeavyHitterMergeException(String message) {
-                       super(message);
-               }
-
-               public CMHeavyHitterMergeException(String message, Throwable 
cause) {
-                       super(message, cause);
-               }
-       }
-
-       @Override
-       public String toString(){
-               String out = "";
-               Map<Object, Long> heavyHitters = getHeavyHitters();
-               for (Map.Entry<Object, Long> entry : heavyHitters.entrySet()){
-                       out += entry.getKey().toString() + " -> estimated freq. 
" + entry.getValue() + "\n";
-               }
-               return out;
-       }
-
-       private void writeObject(ObjectOutputStream oos) throws IOException{
-               oos.defaultWriteObject();
-               oos.writeObject(CountMinSketch.serialize(countMinSketch));
-       }
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-               byte[] countMinBytes = (byte[]) in.readObject();
-               countMinSketch = CountMinSketch.deserialize(countMinBytes);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java
 
b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java
deleted file mode 100644
index bf6f4bf..0000000
--- 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics.heavyhitters;
-
-import java.util.HashMap;
-
-/**
- * Interface for classes that track heavy hitters. It follows the same design 
as the interfaces from
- * {@link com.clearspring.analytics.stream}
- */
-public interface HeavyHitter {
-
-       void addObject(Object o);
-
-       HashMap getHeavyHitters();
-
-       void merge(HeavyHitter toMerge) throws HeavyHitterMergeException;
-
-       String toString();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java
 
b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java
deleted file mode 100644
index 7a65a53..0000000
--- 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.operatorstatistics.heavyhitters;
-
-/**
- * Exception that is thrown when incompatible heavy hitter sketches attempt to 
be merged.
- */
-public class HeavyHitterMergeException extends Exception {
-
-       public HeavyHitterMergeException(String message) {
-               super(message);
-       }
-
-       public HeavyHitterMergeException(String message, Throwable cause) {
-               super(message, cause);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java
 
b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java
deleted file mode 100644
index ad74401..0000000
--- 
a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics.heavyhitters;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Implements Lossy Counting algorithm (Manku, G.S., Motwani, R.: Approximate 
frequency counts over data streams, 2002)
- * The algorithm tracks heavy hitters in a count based fashion. It stores 
heavy hitters, along with a lower bound for
- * their frequency and an error, which determines the upperbound for the 
frequency. It is guaranteed to output all
- * elements with freq. higher than a given threshold and to not output any 
element with a frequency under a given
- * error threshold.
- */
-public class LossyCounting implements HeavyHitter, Serializable{
-
-       private double fraction;
-       private double error;
-       private long cardinality;
-       private Map<Object,Counter> heavyHitters;
-       private long bucket;
-
-       private class Counter implements Serializable {
-               long lowerBound;
-               long frequencyError;
-
-               private Counter(long lowerBound, long frequencyError){
-                       this.lowerBound = lowerBound;
-                       this.frequencyError = frequencyError;
-               }
-
-               private void updateLowerBound(long count){
-                       lowerBound+=count;
-               }
-
-               private long getUpperBound(){
-                       return lowerBound + frequencyError;
-               }
-
-       }
-
-       public LossyCounting(double fraction, double error){
-
-               this.fraction = fraction;
-               this.error = error;
-               this.cardinality = 0;
-               this.heavyHitters = new HashMap<Object, Counter>();
-               this.bucket = 0;
-       }
-
-       @Override
-       public void addObject(Object o) {
-               cardinality+=1;
-               if (heavyHitters.containsKey(o)){
-                       heavyHitters.get(o).updateLowerBound(1);
-               }else{
-                       heavyHitters.put(o,new Counter(1, bucket));
-               }
-               if (cardinality%(long)Math.ceil(1/error)==0) {
-                       bucket += 1;
-                       updateHeavyHitters();
-               }
-       }
-
-       public void updateHeavyHitters(){
-               Iterator it = heavyHitters.entrySet().iterator();
-               while (it.hasNext()) {
-                       if 
(((Map.Entry<Object,Counter>)it.next()).getValue().getUpperBound()< bucket) {
-                               it.remove();
-                       }
-               }
-       }
-
-       public void merge(HeavyHitter toMerge) throws HeavyHitterMergeException 
{
-               try{
-                       LossyCounting lsToMerge = (LossyCounting)toMerge;
-                       if (this.fraction!=lsToMerge.fraction){
-                               throw new HeavyHitterMergeException("Both heavy 
hitter structures must be identical");
-                       }
-                       this.cardinality+=lsToMerge.cardinality;
-                       this.bucket = (long)Math.floor(cardinality*error);
-                       for (Map.Entry<Object, Counter> entry : 
lsToMerge.heavyHitters.entrySet()){
-                               Counter counter = 
this.heavyHitters.get(entry.getKey());
-                               if (counter==null){
-                                       
this.heavyHitters.put(entry.getKey(),entry.getValue());
-                               }else{
-                                       Counter mergingCounter = 
entry.getValue();
-                                       this.heavyHitters.put(entry.getKey(),
-                                                       new 
Counter(mergingCounter.lowerBound+counter.lowerBound, 
mergingCounter.frequencyError +counter.frequencyError));
-                               }
-                       }
-                       updateHeavyHitters();
-               }catch (ClassCastException ex){
-                       throw new HeavyHitterMergeException("Both heavy hitter 
structures must be identical");
-               }
-       }
-
-       @Override
-       public HashMap<Object,Long> getHeavyHitters() {
-               HashMap<Object,Long> heavyHitterLowerBounds = new 
HashMap<Object, Long>();
-               long minFrequency = 
(long)Math.ceil(cardinality*(fraction-error));
-               for (Map.Entry<Object, Counter> entry : 
heavyHitters.entrySet()){
-                       if(entry.getValue().lowerBound>=minFrequency){
-                               heavyHitterLowerBounds.put(entry.getKey(), 
entry.getValue().lowerBound);
-                       }
-               }
-               return heavyHitterLowerBounds;
-       }
-
-       @Override
-       public String toString(){
-               String out = "";
-               long minFrequency = 
(long)Math.ceil(cardinality*(fraction-error));
-               for (Map.Entry<Object, Counter> entry : 
heavyHitters.entrySet()){
-                       if(entry.getValue().lowerBound>=minFrequency) {
-                               out += entry.getKey().toString() + " -> lower 
bound " + entry.getValue().lowerBound + "\n";
-                       }
-               }
-               return out;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
 
b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
deleted file mode 100644
index 82123e0..0000000
--- 
a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Random;
-
-@SuppressWarnings("serial")
-public class OperatorStatsAccumulatorTest extends AbstractTestBase {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(OperatorStatsAccumulatorTest.class);
-
-       private static final String ACCUMULATOR_NAME = "op-stats";
-
-       public OperatorStatsAccumulatorTest() {
-               super(new Configuration());
-       }
-       
-       public static class StringToInt extends RichFlatMapFunction<String, 
Tuple1<Integer>> {
-
-               // Is instantiated later since the runtime context is not yet 
initialized
-               private Accumulator<Object, Serializable> globalAccumulator;
-               private Accumulator<Object,Serializable> localAccumulator;
-               OperatorStatisticsConfig accumulatorConfig;
-
-               public StringToInt(OperatorStatisticsConfig config){
-                       accumulatorConfig = config;
-               }
-
-               @Override
-               public void open(Configuration parameters) {
-                       // Add globalAccumulator using convenience function
-
-                       globalAccumulator = 
getRuntimeContext().getAccumulator(ACCUMULATOR_NAME);
-                       if (globalAccumulator==null){
-                               
getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, new 
OperatorStatisticsAccumulator(accumulatorConfig));
-                               globalAccumulator = 
getRuntimeContext().getAccumulator(ACCUMULATOR_NAME);
-                       }
-
-                       int subTaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-                       localAccumulator = 
getRuntimeContext().getAccumulator(ACCUMULATOR_NAME+"-"+subTaskIndex);
-                       if (localAccumulator==null){
-                               
getRuntimeContext().addAccumulator(ACCUMULATOR_NAME+"-"+subTaskIndex, new 
OperatorStatisticsAccumulator(accumulatorConfig));
-                               localAccumulator = 
getRuntimeContext().getAccumulator(ACCUMULATOR_NAME+"-"+subTaskIndex);
-                       }
-               }
-
-               @Override
-               public void flatMap(String value, Collector<Tuple1<Integer>> 
out) throws Exception {
-                       int intValue;
-                       try {
-                               intValue = Integer.parseInt(value);
-                               localAccumulator.add(intValue);
-                               out.collect(new Tuple1<>(intValue));
-                       } catch (NumberFormatException ignored) {}
-               }
-
-               @Override
-               public void close(){
-                       globalAccumulator.merge(localAccumulator);
-               }
-       }
-
-       @Test
-       public void testAccumulatorAllStatistics() throws Exception {
-
-               String input = "";
-
-               Random rand = new Random();
-
-               for (int i = 1; i < 1000; i++) {
-                       if(rand.nextDouble()<0.2){
-                               input+=String.valueOf(rand.nextInt(4))+"\n";
-                       }else{
-                               input+=String.valueOf(rand.nextInt(100))+"\n";
-                       }
-               }
-
-               String inputFile = createTempFile("datapoints.txt", input);
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-
-               OperatorStatisticsConfig operatorStatisticsConfig =
-                               new 
OperatorStatisticsConfig(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG,
-                                                                               
        OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING);
-
-               env.readTextFile(inputFile).
-                               flatMap(new 
StringToInt(operatorStatisticsConfig)).
-                               output(new 
DiscardingOutputFormat<Tuple1<Integer>>());
-
-               JobExecutionResult result = env.execute();
-
-               OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
-//             System.out.println("Global Stats");
-//             System.out.println(globalStats.toString());
-
-               OperatorStatistics merged = null;
-
-               Map<String,Object> accResults = 
result.getAllAccumulatorResults();
-               for (String accumulatorName:accResults.keySet()){
-                       if (accumulatorName.contains(ACCUMULATOR_NAME+"-")){
-                               OperatorStatistics localStats = 
(OperatorStatistics) accResults.get(accumulatorName);
-                               LOG.debug("Local Stats: " + accumulatorName);
-                               LOG.debug(localStats.toString());
-                               if (merged == null){
-                                       merged = localStats.clone();
-                               }else {
-                                       merged.merge(localStats);
-                               }
-                       }
-               }
-
-               LOG.debug("Local Stats Merged: \n");
-               LOG.debug(merged.toString());
-
-               Assert.assertEquals("Global cardinality should be 999", 999, 
globalStats.cardinality);
-               Assert.assertEquals("Count distinct estimate should be around 
100 and is "+globalStats.estimateCountDistinct()
-                               , 100.0, 
(double)globalStats.estimateCountDistinct(),5.0);
-               Assert.assertTrue("The total number of heavy hitters should be 
between 0 and 5."
-                               , globalStats.getHeavyHitters().size() > 0 && 
globalStats.getHeavyHitters().size() <= 5);
-               Assert.assertEquals("Min when merging the local accumulators 
should correspond with min" +
-                               "of the global 
accumulator",merged.getMin(),globalStats.getMin());
-               Assert.assertEquals("Max resulting from merging the local 
accumulators should correspond to" +
-                               "max of the global 
accumulator",merged.getMax(),globalStats.getMax());
-               Assert.assertEquals("Count distinct when merging the local 
accumulators should correspond to " +
-                               "count distinct in the global 
accumulator",merged.estimateCountDistinct(),globalStats.estimateCountDistinct());
-               Assert.assertEquals("The number of heavy hitters when merging 
the local accumulators should correspond " +
-                               "to the number of heavy hitters in the global 
accumulator",merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size());
-       }
-
-       @Test
-       public void testAccumulatorMinMax() throws Exception {
-
-               String input = "";
-
-               Random rand = new Random();
-
-               for (int i = 1; i < 1000; i++) {
-                       if (rand.nextDouble() < 0.2) {
-                               input += String.valueOf(rand.nextInt(4)) + "\n";
-                       } else {
-                               input += String.valueOf(rand.nextInt(100)) + 
"\n";
-                       }
-               }
-
-               String inputFile = createTempFile("datapoints.txt", input);
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-
-               OperatorStatisticsConfig operatorStatisticsConfig =
-                               new OperatorStatisticsConfig(false);
-               operatorStatisticsConfig.collectMax = true;
-               operatorStatisticsConfig.collectMin = true;
-
-               env.readTextFile(inputFile).
-                               flatMap(new 
StringToInt(operatorStatisticsConfig)).
-                               output(new 
DiscardingOutputFormat<Tuple1<Integer>>());
-
-               JobExecutionResult result = env.execute();
-
-               OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
-//             System.out.println("Global Stats");
-//             System.out.println(globalStats.toString());
-
-               Assert.assertTrue("Min value for accumulator should not be 
null",globalStats.getMin()!=null);
-               Assert.assertTrue("Max value for accumulator should not be 
null",globalStats.getMax()!=null);
-       }
-
-       @Test
-       public void testAccumulatorCountDistinctLinearCounting() throws 
Exception {
-
-               String input = "";
-
-               Random rand = new Random();
-
-               for (int i = 1; i < 1000; i++) {
-                       if (rand.nextDouble() < 0.2) {
-                               input += String.valueOf(rand.nextInt(4)) + "\n";
-                       } else {
-                               input += String.valueOf(rand.nextInt(100)) + 
"\n";
-                       }
-               }
-
-               String inputFile = createTempFile("datapoints.txt", input);
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-
-               OperatorStatisticsConfig operatorStatisticsConfig =
-                               new OperatorStatisticsConfig(false);
-               operatorStatisticsConfig.collectCountDistinct = true;
-               operatorStatisticsConfig.countDistinctAlgorithm = 
OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING;
-               operatorStatisticsConfig.setCountDbitmap(10000);
-
-               env.readTextFile(inputFile).
-                               flatMap(new 
StringToInt(operatorStatisticsConfig)).
-                               output(new 
DiscardingOutputFormat<Tuple1<Integer>>());
-
-               JobExecutionResult result = env.execute();
-
-               OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
-//             System.out.println("Global Stats");
-//             System.out.println(globalStats.toString());
-
-               Assert.assertTrue("Count Distinct for accumulator should not be 
null",globalStats.countDistinct!=null);
-       }
-
-       @Test
-       public void testAccumulatorHeavyHitterCountMinSketch() throws Exception 
{
-
-               String input = "";
-
-               Random rand = new Random();
-
-               for (int i = 1; i < 1000; i++) {
-                       if (rand.nextDouble() < 0.2) {
-                               input += String.valueOf(rand.nextInt(4)) + "\n";
-                       } else {
-                               input += String.valueOf(rand.nextInt(100)) + 
"\n";
-                       }
-               }
-
-               String inputFile = createTempFile("datapoints.txt", input);
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().disableSysoutLogging();
-
-               OperatorStatisticsConfig operatorStatisticsConfig =
-                               new OperatorStatisticsConfig(false);
-               operatorStatisticsConfig.collectHeavyHitters = true;
-               operatorStatisticsConfig.heavyHitterAlgorithm = 
OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH;
-
-               env.readTextFile(inputFile).
-                               flatMap(new 
StringToInt(operatorStatisticsConfig)).
-                               output(new 
DiscardingOutputFormat<Tuple1<Integer>>());
-
-               JobExecutionResult result = env.execute();
-
-               OperatorStatistics globalStats = 
result.getAccumulatorResult(ACCUMULATOR_NAME);
-//             System.out.println("Global Stats");
-//             System.out.println(globalStats.toString());
-
-               Assert.assertTrue("Count Distinct for accumulator should not be 
null",globalStats.heavyHitter!=null);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java
 
b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java
deleted file mode 100644
index 9deaec6..0000000
--- 
a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics.heavyhitters;
-
-import com.clearspring.analytics.stream.frequency.CountMinSketch;
-import org.junit.Test;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Random;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test the structure built to track heavy hitters using the count min sketch 
from
- * {@link com.clearspring.analytics.stream.frequency.CountMinSketch}
- */
-public class CountMinHeavyHitterTest {
-
-       static final double fraction = 0.01;
-       static final double error = 0.005;
-       static final double confidence = 0.99;
-       static final int seed = 7362181;
-       static final Random r = new Random();
-       static final int cardinality = 1000000;
-       static final int maxScale = 100000;
-
-       @Test
-       public void testAccuracy() {
-
-               long[] actualFreq = new long[maxScale];
-
-               CountMinHeavyHitter cmTopK = new 
CountMinHeavyHitter(fraction,error,confidence,seed);
-
-               for (int i = 0; i < cardinality; i++) {
-                       int value;
-                       if (r.nextDouble()<0.1){
-                               value = r.nextInt(10);
-                       }else{
-                               value = r.nextInt(maxScale);
-                       }
-                       cmTopK.addObject(value);
-                       actualFreq[value]++;
-               }
-
-               long frequency = (long)Math.ceil(cardinality* fraction);
-               for (int i=0;i<actualFreq.length;i++){
-                       if (actualFreq[i]>frequency){
-                               assertTrue("Heavy Hitter not found :" + i +","+ 
actualFreq[i], cmTopK.getHeavyHitters().containsKey(i));
-                       }
-               }
-
-               Iterator it = cmTopK.getHeavyHitters().entrySet().iterator();
-               while (it.hasNext()) {
-                       Map.Entry heavyHitter = (Map.Entry)it.next();
-                       Long estimateError = (Long)heavyHitter.getValue() - 
actualFreq[(Integer)heavyHitter.getKey()];
-                       assertTrue("Difference between real frequency and 
estimate is too large: " + estimateError,
-                                       estimateError < (error*cardinality));
-               }
-       }
-
-       @Test
-       public void merge() throws 
CountMinHeavyHitter.CMHeavyHitterMergeException {
-
-               int numToMerge = 5;
-
-               long[] actualFreq = new long[maxScale];
-               CountMinHeavyHitter merged = new 
CountMinHeavyHitter(fraction,error,confidence,seed);
-               long totalCardinality = 0;
-
-               CountMinHeavyHitter[] sketches = new 
CountMinHeavyHitter[numToMerge];
-               for (int i = 0; i < numToMerge; i++) {
-                       CountMinSketch cms = new CountMinSketch(error, 
confidence, seed);
-                       sketches[i] = new CountMinHeavyHitter(cms, fraction);
-                       for (int j = 0; j < cardinality; j++) {
-                               int val;
-                               if (r.nextDouble()<0.1){
-                                       val = r.nextInt(10);
-                               }else{
-                                       val = r.nextInt(maxScale);
-                               }
-                               sketches[i].addObject(val);
-                               actualFreq[val]++;
-                               totalCardinality++;
-                       }
-                       merged.merge(sketches[i]);
-               }
-
-               Map<Object,Long> mergedHeavyHitters = merged.getHeavyHitters();
-               long frequency = (long)(totalCardinality*fraction);
-
-               for (int i = 0; i < actualFreq.length; ++i) {
-                       if (actualFreq[i] >= frequency) {
-                               assertTrue("All items with freq. > s.n will be 
found. Item " + i + ". Real freq. " + actualFreq[i] + " Expected freq." + 
frequency, mergedHeavyHitters.containsKey(i));
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java
 
b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java
deleted file mode 100644
index 502b361..0000000
--- 
a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.operatorstatistics.heavyhitters;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-import java.util.Random;
-
-import static org.junit.Assert.assertTrue;
-
-/*
-* Test the structure implemented for Lossy Counting
-*/
-
-public class LossyCountingTest {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(LossyCountingTest.class);
-
-       static final double fraction = 0.01;
-       static final double error = 0.005;
-       static final Random r = new Random();
-       static final int cardinality = 1000000;
-       static final int maxScale = 100000;
-
-       @Test
-       public void testAccuracy() {
-
-               long[] actualFreq = new long[maxScale];
-
-               LossyCounting lossyCounting = new LossyCounting(fraction,error);
-
-               for (int i = 0; i < cardinality; i++) {
-                       int value;
-                       if (r.nextDouble()<0.1){
-                               value = r.nextInt(10);
-                       }else{
-                               value = r.nextInt(maxScale);
-                       }
-                       lossyCounting.addObject(value);
-                       actualFreq[value]++;
-               }
-
-               LOG.debug("Size of heavy hitters: 
"+lossyCounting.getHeavyHitters().size());
-               LOG.debug(lossyCounting.toString());
-
-               Map<Object,Long> heavyHitters = lossyCounting.getHeavyHitters();
-               long frequency = (long)Math.ceil(cardinality* fraction);
-               long minFrequency = (long)Math.ceil(cardinality* 
(fraction-error));
-
-               for (int i = 0; i < actualFreq.length; ++i) {
-                       if (actualFreq[i]>=frequency) {
-                               assertTrue("All items with freq. > s.n will be 
found. Item " + i + ". Real freq. " + actualFreq[i] + " Expected freq." + 
frequency, heavyHitters.containsKey(i));
-                       }
-                       if (heavyHitters.containsKey(i)){
-                               assertTrue("no item with freq. < (s-e).n will 
be found. Item " + i + ". Real freq. " + actualFreq[i]+" Min freq."+ 
minFrequency, actualFreq[i]>=minFrequency);
-                               assertTrue("the estimated freq. underestimates 
the true freq. by < e.n. Real freq. " + actualFreq[i] + " Lower bound 
"+heavyHitters.get(i),
-                                               
Math.abs(heavyHitters.get(i)-actualFreq[i]) < error*cardinality);
-                       }
-               }
-       }
-
-       @Test
-       public void merge() throws HeavyHitterMergeException {
-               int numToMerge = 5;
-               LossyCounting merged = new LossyCounting(fraction,error);
-               LossyCounting[] sketches = new LossyCounting[numToMerge];
-
-               long[] actualFreq = new long[maxScale];
-               long totalCardinality = 0;
-
-               for (int i = 0; i < numToMerge; i++) {
-                       sketches[i] = new LossyCounting(fraction,error);
-                       for (int j = 0; j < cardinality; j++) {
-                               int val;
-                               if (r.nextDouble()<0.1){
-                                       val = r.nextInt(10);
-                               }else{
-                                       val = r.nextInt(maxScale);
-                               }
-                               sketches[i].addObject(val);
-                               actualFreq[val]++;
-                               totalCardinality++;
-                       }
-                       merged.merge(sketches[i]);
-               }
-
-               System.out.println("\nMERGED\n" + merged.toString());
-
-               Map<Object,Long> mergedHeavyHitters = merged.getHeavyHitters();
-               long frequency = (long)(totalCardinality*fraction);
-               long minFrequency = (long)(totalCardinality*(fraction-error));
-
-               System.out.println("Frequency Threshold:" + frequency);
-               System.out.println("False positive Threshold:" + minFrequency);
-               System.out.println("Frequency of 14:" + actualFreq[14]);
-
-               System.out.println("Real frequent items: ");
-
-               for (int i = 0; i < actualFreq.length; ++i) {
-                       if (actualFreq[i]>=frequency) {
-                               System.out.println(i+": "+actualFreq[i]);
-                               assertTrue("All items with freq. > s.n will be 
found. Item " + i + ". Real freq. " + actualFreq[i]+" Expected freq."+ 
frequency, mergedHeavyHitters.containsKey(i));
-                       }
-                       if (mergedHeavyHitters.containsKey(i)){
-                               assertTrue("no item with freq. < (s-e).n will 
be found. Item " + i + ". Real freq. " + actualFreq[i]+" Min freq."+ 
minFrequency, actualFreq[i]>=minFrequency);
-                               assertTrue("the estimated freq. underestimates 
the true freq. by < e.n. Real freq. " + actualFreq[i] + " Lower bound 
"+mergedHeavyHitters.get(i),
-                                               
Math.abs(mergedHeavyHitters.get(i)-actualFreq[i]) < error*cardinality);
-                       }
-               }
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml
index be34446..6b01acd 100644
--- a/flink-contrib/pom.xml
+++ b/flink-contrib/pom.xml
@@ -41,7 +41,6 @@ under the License.
                <module>flink-storm-examples</module>
                <module>flink-streaming-contrib</module>
                <module>flink-tweet-inputformat</module>
-               <module>flink-operator-stats</module>
                <module>flink-connector-wikiedits</module>
                <module>flink-statebackend-rocksdb</module>
        </modules>

Reply via email to