Repository: flink Updated Branches: refs/heads/master d443d6b02 -> d5b97b07b
[FLINK-4891] Remove flink-contrib/flink-operator-stats module Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d5b97b07 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d5b97b07 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d5b97b07 Branch: refs/heads/master Commit: d5b97b07b46700bb26e1405a3715206bc5db9df0 Parents: d443d6b Author: Ufuk Celebi <[email protected]> Authored: Mon Oct 24 11:08:08 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Mon Oct 24 11:08:08 2016 +0200 ---------------------------------------------------------------------- flink-contrib/flink-operator-stats/pom.xml | 75 ----- .../operatorstatistics/OperatorStatistics.java | 269 ------------------ .../OperatorStatisticsAccumulator.java | 66 ----- .../OperatorStatisticsConfig.java | 161 ----------- .../heavyhitters/CountMinHeavyHitter.java | 172 ------------ .../heavyhitters/HeavyHitter.java | 37 --- .../heavyhitters/HeavyHitterMergeException.java | 32 --- .../heavyhitters/LossyCounting.java | 140 --------- .../OperatorStatsAccumulatorTest.java | 281 ------------------- .../heavyhitters/CountMinHeavyHitterTest.java | 115 -------- .../heavyhitters/LossyCountingTest.java | 131 --------- flink-contrib/pom.xml | 1 - 12 files changed, 1480 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/pom.xml b/flink-contrib/flink-operator-stats/pom.xml deleted file mode 100644 index 6fa6ecf..0000000 --- a/flink-contrib/flink-operator-stats/pom.xml +++ /dev/null @@ -1,75 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-contrib</artifactId> - <version>1.2-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-operator-stats_2.10</artifactId> - <name>flink-operator-stats</name> - - <packaging>jar</packaging> - - <dependencies> - - <!-- core dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>com.clearspring.analytics</groupId> - <artifactId>stream</artifactId> - <version>2.7.0</version> - <exclusions> - <exclusion> - <groupId>it.unimi.dsi</groupId> - <artifactId>fastutil</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils_2.10</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java deleted file mode 100644 index 794ab57..0000000 --- a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatistics.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics; - -import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; -import com.clearspring.analytics.stream.cardinality.HyperLogLog; -import com.clearspring.analytics.stream.cardinality.ICardinality; -import com.clearspring.analytics.stream.cardinality.LinearCounting; -import org.apache.flink.contrib.operatorstatistics.heavyhitters.HeavyHitter; -import org.apache.flink.contrib.operatorstatistics.heavyhitters.LossyCounting; -import org.apache.flink.contrib.operatorstatistics.heavyhitters.CountMinHeavyHitter; -import org.apache.flink.contrib.operatorstatistics.heavyhitters.HeavyHitterMergeException; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.Map; - -/** - * Data structure that encapsulates statistical information of data that has only been processed by one pass - * This statistical information is meant to help determine the distribution of the data that has been processed - * in an operator so that we can determine if it is necessary to repartition the data - * - * The statistics to be gathered are configurable and represented by a {@link OperatorStatisticsConfig} object. - * - * The information encapsulated in this class is min, max, a structure enabling estimation of count distinct and a - * structure holding the heavy hitters along with their frequency. - * - */ -public class OperatorStatistics implements Serializable { - - OperatorStatisticsConfig config; - - transient Object min = null; - transient Object max = null; - long cardinality = 0; - transient ICardinality countDistinct; - transient HeavyHitter heavyHitter; - - public OperatorStatistics(OperatorStatisticsConfig config) { - this.config = config; - if (config.collectCountDistinct) { - if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)) { - countDistinct = new LinearCounting(config.getCountDbitmap()); - } - if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)) { - countDistinct = new HyperLogLog(config.getCountDlog2m()); - } - } - if (config.collectHeavyHitters) { - if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)) { - heavyHitter = - new LossyCounting(config.getHeavyHitterFraction(), config.getHeavyHitterError()); - } - if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH)) { - heavyHitter = - new CountMinHeavyHitter(config.getHeavyHitterFraction(), - config.getHeavyHitterError(), - config.getHeavyHitterConfidence(), - config.getHeavyHitterSeed()); - } - } - } - - public void process(Object tupleObject){ - if (tupleObject instanceof Comparable) { - if (config.collectMin && (min == null || ((Comparable) tupleObject).compareTo(min) < 0)) { - min = tupleObject; - } - if (config.collectMax && (max == null || ((Comparable) tupleObject).compareTo(max) > 0)) { - max = tupleObject; - } - } - if (config.collectCountDistinct){ - countDistinct.offer(tupleObject); - } - if (config.collectHeavyHitters){ - heavyHitter.addObject(tupleObject); - } - cardinality+=1; - } - - public void merge(OperatorStatistics other) throws RuntimeException { - if (config.collectMin & other.min!=null && (min==null || ((Comparable) other.min).compareTo(min) < 0)) { - this.min = other.min; - } - - if (config.collectMax && other.max!=null && (max == null || ((Comparable) other.max).compareTo(max) > 0)) { - this.max = other.max; - } - if (config.collectCountDistinct){ - try { - ICardinality mergedCountDistinct = this.countDistinct.merge(new ICardinality[]{this.countDistinct,other.countDistinct}); - this.countDistinct = mergedCountDistinct; - } catch (CardinalityMergeException e) { - throw new RuntimeException("Error merging count distinct structures",e); - } - } - if (config.collectHeavyHitters){ - try { - this.heavyHitter.merge(other.heavyHitter); - } catch (HeavyHitterMergeException e) { - throw new RuntimeException("Error merging heavy hitter structures",e); - } - } - this.cardinality+=other.cardinality; - } - - public Object getMin() { - return min; - } - - public Object getMax() { - return max; - } - - public long estimateCountDistinct(){ - return countDistinct.cardinality(); - } - - public Map<Object,Long> getHeavyHitters(){ - return heavyHitter.getHeavyHitters(); - } - - @Override - public String toString(){ - String out = "\ntotal cardinality: "+this.cardinality; - if (config.collectMax) { - out += "\nmax: " + this.max; - } - if (config.collectMin){ - out+="\nmin: "+this.min; - } - if (config.collectCountDistinct){ - if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG)){ - out+="\ncount distinct estimate("+config.countDistinctAlgorithm+ - "["+config.getCountDlog2m()+"]): "+ - this.countDistinct.cardinality(); - }else{ - out+="\ncount distinct estimate("+config.countDistinctAlgorithm+ - "["+config.getCountDbitmap()+"]): "+ - this.countDistinct.cardinality(); - } - } - if (config.collectHeavyHitters){ - if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)) { - out += "\nheavy hitters (" + config.heavyHitterAlgorithm + - "[" + config.getHeavyHitterFraction() + ", " - + config.getHeavyHitterError() + "]):"; - out += "\n" + heavyHitter.toString(); - }else { - out += "\nheavy hitters (" + config.heavyHitterAlgorithm + - "[" + config.getHeavyHitterFraction() + ", " - + config.getHeavyHitterError() + ", " - + config.getHeavyHitterConfidence() + "]):"; - out += "\n" + heavyHitter.toString(); - } - } - return out; - } - - @Override - public OperatorStatistics clone(){ - OperatorStatistics clone = new OperatorStatistics(config); - clone.min = min; - clone.max = max; - clone.cardinality = cardinality; - - try { - if (config.collectCountDistinct) { - ICardinality copy; - if (countDistinct instanceof LinearCounting) { - copy = new LinearCounting(config.getCountDbitmap()); - } else if (countDistinct instanceof HyperLogLog) { - copy = new HyperLogLog(config.getCountDlog2m()); - } else { - throw new IllegalStateException("Unsupported count distinct counter."); - } - clone.countDistinct = copy.merge(countDistinct); - } - } catch (CardinalityMergeException e) { - throw new RuntimeException("Faild to clone OperatorStatistics!"); - } - - try { - if (config.collectHeavyHitters) { - HeavyHitter copy; - if (heavyHitter instanceof LossyCounting) { - copy = new LossyCounting(config.getHeavyHitterFraction(), config.getHeavyHitterError()); - } else if (heavyHitter instanceof CountMinHeavyHitter) { - copy = new CountMinHeavyHitter(config.getHeavyHitterFraction(), - config.getHeavyHitterError(), - config.getHeavyHitterConfidence(), - config.getHeavyHitterSeed()); - } else { - throw new IllegalStateException("Unsupported heavy hitter counter."); - } - copy.merge(heavyHitter); - clone.heavyHitter = copy; - } - } catch (HeavyHitterMergeException e) { - throw new RuntimeException("Failed to clone OperatorStatistics!"); - } - - return clone; - } - - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - if (config.collectMin){ - out.writeObject(min); - } - if (config.collectMax){ - out.writeObject(max); - } - if (config.collectCountDistinct){ - if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)){ - out.writeObject(countDistinct.getBytes()); - }else{ - out.writeObject(countDistinct); - } - } - if (config.collectHeavyHitters){ - out.writeObject(heavyHitter); - } - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - if (config.collectMin){ - min = in.readObject(); - } - if (config.collectMax){ - max = in.readObject(); - } - if (config.collectCountDistinct){ - if (config.countDistinctAlgorithm.equals(OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING)){ - countDistinct = new LinearCounting((byte[])in.readObject()); - }else{ - countDistinct = (HyperLogLog)in.readObject(); - } - } - if (config.collectHeavyHitters) { - if (config.heavyHitterAlgorithm.equals(OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING)){ - heavyHitter = (LossyCounting)in.readObject(); - }else{ - heavyHitter = (CountMinHeavyHitter)in.readObject(); - } - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java deleted file mode 100644 index 857a403..0000000 --- a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsAccumulator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.contrib.operatorstatistics; - -import org.apache.flink.api.common.accumulators.Accumulator; - -/** - * This accumulator wraps the class {@link OperatorStatistics} to track - * estimated values for count distinct and heavy hitters. - * - */ -public class OperatorStatisticsAccumulator implements Accumulator<Object, OperatorStatistics> { - - private OperatorStatistics local; - - public OperatorStatisticsAccumulator(){ - local = new OperatorStatistics(new OperatorStatisticsConfig()); - } - - public OperatorStatisticsAccumulator(OperatorStatisticsConfig config){ - local = new OperatorStatistics(config); - } - - @Override - public void add(Object value) { - local.process(value); - } - - @Override - public OperatorStatistics getLocalValue() { - return local; - } - - @Override - public void resetLocal() { - local = new OperatorStatistics(new OperatorStatisticsConfig()); - } - - @Override - public void merge(Accumulator<Object, OperatorStatistics> other) { - local.merge(other.getLocalValue()); - } - - @Override - public Accumulator<Object, OperatorStatistics> clone() { - OperatorStatisticsAccumulator clone = new OperatorStatisticsAccumulator(); - clone.local = this.local.clone(); - return clone; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java deleted file mode 100644 index d12b01d..0000000 --- a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/OperatorStatisticsConfig.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics; - -import java.io.Serializable; - -/** - * Configures the behavior of an {@link OperatorStatistics} instance. - * - * Sets the parameters that determine the accuracy of the count distinct and heavy hitter sketches - * - * Defines the statistics to be collected. A boolean field indicates whether a given statistic should be collected or not - * - * Encapsulates an enum indicating which sketch should be used for count distinct and another indicating which sketch - * should be used for detecting heavy hitters. - */ - -public class OperatorStatisticsConfig implements Serializable { - - private int countDbitmap = 1000000; - private int countDlog2m = 10; - - private int heavyHitterSeed = 121311332; - private double heavyHitterConfidence = 0.99; - private double heavyHitterFraction = 0.05; - private double heavyHitterError = 0.0005; - - public boolean collectMin; - public boolean collectMax; - public boolean collectCountDistinct; - public boolean collectHeavyHitters; - public CountDistinctAlgorithm countDistinctAlgorithm; - public HeavyHitterAlgorithm heavyHitterAlgorithm; - - public OperatorStatisticsConfig(){ - this.collectMin = true; - this.collectMax = true; - this.collectCountDistinct = true; - this.collectHeavyHitters = true; - this.countDistinctAlgorithm = CountDistinctAlgorithm.HYPERLOGLOG; - this.heavyHitterAlgorithm = HeavyHitterAlgorithm.LOSSY_COUNTING; - } - - public OperatorStatisticsConfig(boolean collect){ - this.collectMin = collect; - this.collectMax = collect; - this.collectCountDistinct = collect; - this.collectHeavyHitters = collect; - this.countDistinctAlgorithm = CountDistinctAlgorithm.HYPERLOGLOG; //Defaut algorithm - this.heavyHitterAlgorithm = HeavyHitterAlgorithm.LOSSY_COUNTING; //Default algorithm - } - - public OperatorStatisticsConfig(CountDistinctAlgorithm countDistinct, HeavyHitterAlgorithm heavyHitter) { - this.collectMin = true; - this.collectMax = true; - this.collectCountDistinct = true; - this.collectHeavyHitters = true; - this.countDistinctAlgorithm = countDistinct; - this.heavyHitterAlgorithm = heavyHitter; - } - - public enum CountDistinctAlgorithm { - - LINEAR_COUNTING, - HYPERLOGLOG; - - } - - public enum HeavyHitterAlgorithm{ - LOSSY_COUNTING, - COUNT_MIN_SKETCH; - } - - public void setCountDbitmap(int countDbitmap) { - this.countDbitmap = countDbitmap; - } - - public void setCountDlog2m(int countDlog2m) { - this.countDlog2m = countDlog2m; - } - - public void setHeavyHitterConfidence(double heavyHitterConfidence) { - this.heavyHitterConfidence = heavyHitterConfidence; - } - - public void setHeavyHitterSeed(int heavyHitterSeed) { - this.heavyHitterSeed = heavyHitterSeed; - } - - public void setHeavyHitterFraction(double heavyHitterFraction) { - this.heavyHitterFraction = heavyHitterFraction; - } - - public void setHeavyHitterError(double heavyHitterError) { - this.heavyHitterError = heavyHitterError; - } - - public void setCollectMin(boolean collectMin) { - this.collectMin = collectMin; - } - - public void setCollectMax(boolean collectMax) { - this.collectMax = collectMax; - } - - public void setCollectCountDistinct(boolean collectCountDistinct) { - this.collectCountDistinct = collectCountDistinct; - } - - public void setCollectHeavyHitters(boolean collectHeavyHitters) { - this.collectHeavyHitters = collectHeavyHitters; - } - - public void setCountDistinctAlgorithm(CountDistinctAlgorithm countDistinctAlgorithm) { - this.countDistinctAlgorithm = countDistinctAlgorithm; - } - - public void setHeavyHitterAlgorithm(HeavyHitterAlgorithm heavyHitterAlgorithm) { - this.heavyHitterAlgorithm = heavyHitterAlgorithm; - } - - public int getCountDbitmap() { - return countDbitmap; - } - - public int getCountDlog2m() { - return countDlog2m; - } - - public int getHeavyHitterSeed() { - return heavyHitterSeed; - } - - public double getHeavyHitterConfidence() { - return heavyHitterConfidence; - } - - public double getHeavyHitterFraction() { - return heavyHitterFraction; - } - - public double getHeavyHitterError() { - return heavyHitterError; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java deleted file mode 100644 index 30a6c1b..0000000 --- a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitter.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics.heavyhitters; - -import com.clearspring.analytics.hash.MurmurHash; -import com.clearspring.analytics.stream.frequency.CountMinSketch; - -import java.io.Serializable; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -/* - * This class tracks heavy hitters using the {@link com.clearspring.analytics.stream.frequency.CountMinSketch} structure - * to estimate frequencies. - */ -public class CountMinHeavyHitter implements HeavyHitter, Serializable { - - private transient CountMinSketch countMinSketch; - private HashMap<Object,Long> heavyHitters; - private double fraction; - private double error; - private long cardinality; - - public CountMinHeavyHitter(double fraction, double error, double confidence, int seed){ - this.countMinSketch = new CountMinSketch(error,confidence,seed); - this.error = error; - this.cardinality = 0; - this.fraction = fraction; - this.heavyHitters = new HashMap<Object,Long>(); - } - - public CountMinHeavyHitter(CountMinSketch countMinSketch, double fraction){ - this.countMinSketch = countMinSketch; - this.error = countMinSketch.getRelativeError(); - this.cardinality = 0; - this.fraction = fraction; - this.heavyHitters = new HashMap<Object,Long>(); - } - - @Override - public void addObject(Object o) { - cardinality +=1; - if (o instanceof Long){ - countMinSketch.add((Long)o, 1); - }else{ - countMinSketch.add(MurmurHash.hash(o), 1); - } - updateHeavyHitters(o); - } - - private void updateHeavyHitters(Object item){ - long minFrequency = (long)Math.ceil(cardinality * fraction); - long estimateCount = estimateCount(item); - - if (estimateCount >= minFrequency){ - heavyHitters.put(item, estimateCount); - } - - if (cardinality%(long)Math.ceil(1/error)==0) { - removeNonFrequent(minFrequency); - } - } - - private void removeNonFrequent(long minFrequency){ - Iterator it = heavyHitters.entrySet().iterator(); - while (it.hasNext()) { - if (((Map.Entry<Object,Long>)it.next()).getValue() < minFrequency) { - it.remove(); - } - } - } - - public long estimateCount(Object item){ - if (item instanceof Long){ - return countMinSketch.estimateCount((Long)item); - }else{ - return countMinSketch.estimateCount(MurmurHash.hash(item)); - } - } - - public void merge(HeavyHitter toMerge) throws CMHeavyHitterMergeException { - - try { - CountMinHeavyHitter cmToMerge = (CountMinHeavyHitter)toMerge; - if (this.fraction != cmToMerge.fraction) { - throw new CMHeavyHitterMergeException("The fraction for both heavy hitters must be the same"); - } - - this.countMinSketch = CountMinSketch.merge(this.countMinSketch, cmToMerge.countMinSketch); - - HashMap<Object,Long> mergedHeavyHitters = new HashMap<Object, Long>(); - - for (Map.Entry<Object, Long> entry : this.heavyHitters.entrySet()) { - mergedHeavyHitters.put(entry.getKey(), estimateCount(entry.getKey())); - } - - for (Map.Entry<Object, Long> entry : cmToMerge.heavyHitters.entrySet()) { - if (!mergedHeavyHitters.containsKey(entry.getKey())) { - mergedHeavyHitters.put(entry.getKey(), estimateCount(entry.getKey())); - } - } - this.heavyHitters = mergedHeavyHitters; - cardinality+=cmToMerge.cardinality; - - }catch (ClassCastException ex){ - throw new CMHeavyHitterMergeException("Both heavy hitter objects must belong to the same class", ex); - }catch (Exception ex){ - throw new CMHeavyHitterMergeException("Cannot merge count min sketches: ", ex); - } - } - - - @Override - public HashMap<Object,Long> getHeavyHitters() { - long minFrequency = (long)Math.ceil(cardinality * fraction); - removeNonFrequent(minFrequency); - return heavyHitters; - } - - protected static class CMHeavyHitterMergeException extends HeavyHitterMergeException { - - public CMHeavyHitterMergeException(String message) { - super(message); - } - - public CMHeavyHitterMergeException(String message, Throwable cause) { - super(message, cause); - } - } - - @Override - public String toString(){ - String out = ""; - Map<Object, Long> heavyHitters = getHeavyHitters(); - for (Map.Entry<Object, Long> entry : heavyHitters.entrySet()){ - out += entry.getKey().toString() + " -> estimated freq. " + entry.getValue() + "\n"; - } - return out; - } - - private void writeObject(ObjectOutputStream oos) throws IOException{ - oos.defaultWriteObject(); - oos.writeObject(CountMinSketch.serialize(countMinSketch)); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - byte[] countMinBytes = (byte[]) in.readObject(); - countMinSketch = CountMinSketch.deserialize(countMinBytes); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java deleted file mode 100644 index bf6f4bf..0000000 --- a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitter.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics.heavyhitters; - -import java.util.HashMap; - -/** - * Interface for classes that track heavy hitters. It follows the same design as the interfaces from - * {@link com.clearspring.analytics.stream} - */ -public interface HeavyHitter { - - void addObject(Object o); - - HashMap getHeavyHitters(); - - void merge(HeavyHitter toMerge) throws HeavyHitterMergeException; - - String toString(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java deleted file mode 100644 index 7a65a53..0000000 --- a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/HeavyHitterMergeException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.contrib.operatorstatistics.heavyhitters; - -/** - * Exception that is thrown when incompatible heavy hitter sketches attempt to be merged. - */ -public class HeavyHitterMergeException extends Exception { - - public HeavyHitterMergeException(String message) { - super(message); - } - - public HeavyHitterMergeException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java b/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java deleted file mode 100644 index ad74401..0000000 --- a/flink-contrib/flink-operator-stats/src/main/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCounting.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics.heavyhitters; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -/** - * Implements Lossy Counting algorithm (Manku, G.S., Motwani, R.: Approximate frequency counts over data streams, 2002) - * The algorithm tracks heavy hitters in a count based fashion. It stores heavy hitters, along with a lower bound for - * their frequency and an error, which determines the upperbound for the frequency. It is guaranteed to output all - * elements with freq. higher than a given threshold and to not output any element with a frequency under a given - * error threshold. - */ -public class LossyCounting implements HeavyHitter, Serializable{ - - private double fraction; - private double error; - private long cardinality; - private Map<Object,Counter> heavyHitters; - private long bucket; - - private class Counter implements Serializable { - long lowerBound; - long frequencyError; - - private Counter(long lowerBound, long frequencyError){ - this.lowerBound = lowerBound; - this.frequencyError = frequencyError; - } - - private void updateLowerBound(long count){ - lowerBound+=count; - } - - private long getUpperBound(){ - return lowerBound + frequencyError; - } - - } - - public LossyCounting(double fraction, double error){ - - this.fraction = fraction; - this.error = error; - this.cardinality = 0; - this.heavyHitters = new HashMap<Object, Counter>(); - this.bucket = 0; - } - - @Override - public void addObject(Object o) { - cardinality+=1; - if (heavyHitters.containsKey(o)){ - heavyHitters.get(o).updateLowerBound(1); - }else{ - heavyHitters.put(o,new Counter(1, bucket)); - } - if (cardinality%(long)Math.ceil(1/error)==0) { - bucket += 1; - updateHeavyHitters(); - } - } - - public void updateHeavyHitters(){ - Iterator it = heavyHitters.entrySet().iterator(); - while (it.hasNext()) { - if (((Map.Entry<Object,Counter>)it.next()).getValue().getUpperBound()< bucket) { - it.remove(); - } - } - } - - public void merge(HeavyHitter toMerge) throws HeavyHitterMergeException { - try{ - LossyCounting lsToMerge = (LossyCounting)toMerge; - if (this.fraction!=lsToMerge.fraction){ - throw new HeavyHitterMergeException("Both heavy hitter structures must be identical"); - } - this.cardinality+=lsToMerge.cardinality; - this.bucket = (long)Math.floor(cardinality*error); - for (Map.Entry<Object, Counter> entry : lsToMerge.heavyHitters.entrySet()){ - Counter counter = this.heavyHitters.get(entry.getKey()); - if (counter==null){ - this.heavyHitters.put(entry.getKey(),entry.getValue()); - }else{ - Counter mergingCounter = entry.getValue(); - this.heavyHitters.put(entry.getKey(), - new Counter(mergingCounter.lowerBound+counter.lowerBound, mergingCounter.frequencyError +counter.frequencyError)); - } - } - updateHeavyHitters(); - }catch (ClassCastException ex){ - throw new HeavyHitterMergeException("Both heavy hitter structures must be identical"); - } - } - - @Override - public HashMap<Object,Long> getHeavyHitters() { - HashMap<Object,Long> heavyHitterLowerBounds = new HashMap<Object, Long>(); - long minFrequency = (long)Math.ceil(cardinality*(fraction-error)); - for (Map.Entry<Object, Counter> entry : heavyHitters.entrySet()){ - if(entry.getValue().lowerBound>=minFrequency){ - heavyHitterLowerBounds.put(entry.getKey(), entry.getValue().lowerBound); - } - } - return heavyHitterLowerBounds; - } - - @Override - public String toString(){ - String out = ""; - long minFrequency = (long)Math.ceil(cardinality*(fraction-error)); - for (Map.Entry<Object, Counter> entry : heavyHitters.entrySet()){ - if(entry.getValue().lowerBound>=minFrequency) { - out += entry.getKey().toString() + " -> lower bound " + entry.getValue().lowerBound + "\n"; - } - } - return out; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java deleted file mode 100644 index 82123e0..0000000 --- a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/OperatorStatsAccumulatorTest.java +++ /dev/null @@ -1,281 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.AbstractTestBase; -import org.apache.flink.util.Collector; - -import org.junit.Assert; -import org.junit.Test; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.Serializable; -import java.util.Map; -import java.util.Random; - -@SuppressWarnings("serial") -public class OperatorStatsAccumulatorTest extends AbstractTestBase { - - private static final Logger LOG = LoggerFactory.getLogger(OperatorStatsAccumulatorTest.class); - - private static final String ACCUMULATOR_NAME = "op-stats"; - - public OperatorStatsAccumulatorTest() { - super(new Configuration()); - } - - public static class StringToInt extends RichFlatMapFunction<String, Tuple1<Integer>> { - - // Is instantiated later since the runtime context is not yet initialized - private Accumulator<Object, Serializable> globalAccumulator; - private Accumulator<Object,Serializable> localAccumulator; - OperatorStatisticsConfig accumulatorConfig; - - public StringToInt(OperatorStatisticsConfig config){ - accumulatorConfig = config; - } - - @Override - public void open(Configuration parameters) { - // Add globalAccumulator using convenience function - - globalAccumulator = getRuntimeContext().getAccumulator(ACCUMULATOR_NAME); - if (globalAccumulator==null){ - getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, new OperatorStatisticsAccumulator(accumulatorConfig)); - globalAccumulator = getRuntimeContext().getAccumulator(ACCUMULATOR_NAME); - } - - int subTaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - localAccumulator = getRuntimeContext().getAccumulator(ACCUMULATOR_NAME+"-"+subTaskIndex); - if (localAccumulator==null){ - getRuntimeContext().addAccumulator(ACCUMULATOR_NAME+"-"+subTaskIndex, new OperatorStatisticsAccumulator(accumulatorConfig)); - localAccumulator = getRuntimeContext().getAccumulator(ACCUMULATOR_NAME+"-"+subTaskIndex); - } - } - - @Override - public void flatMap(String value, Collector<Tuple1<Integer>> out) throws Exception { - int intValue; - try { - intValue = Integer.parseInt(value); - localAccumulator.add(intValue); - out.collect(new Tuple1<>(intValue)); - } catch (NumberFormatException ignored) {} - } - - @Override - public void close(){ - globalAccumulator.merge(localAccumulator); - } - } - - @Test - public void testAccumulatorAllStatistics() throws Exception { - - String input = ""; - - Random rand = new Random(); - - for (int i = 1; i < 1000; i++) { - if(rand.nextDouble()<0.2){ - input+=String.valueOf(rand.nextInt(4))+"\n"; - }else{ - input+=String.valueOf(rand.nextInt(100))+"\n"; - } - } - - String inputFile = createTempFile("datapoints.txt", input); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - - OperatorStatisticsConfig operatorStatisticsConfig = - new OperatorStatisticsConfig(OperatorStatisticsConfig.CountDistinctAlgorithm.HYPERLOGLOG, - OperatorStatisticsConfig.HeavyHitterAlgorithm.LOSSY_COUNTING); - - env.readTextFile(inputFile). - flatMap(new StringToInt(operatorStatisticsConfig)). - output(new DiscardingOutputFormat<Tuple1<Integer>>()); - - JobExecutionResult result = env.execute(); - - OperatorStatistics globalStats = result.getAccumulatorResult(ACCUMULATOR_NAME); -// System.out.println("Global Stats"); -// System.out.println(globalStats.toString()); - - OperatorStatistics merged = null; - - Map<String,Object> accResults = result.getAllAccumulatorResults(); - for (String accumulatorName:accResults.keySet()){ - if (accumulatorName.contains(ACCUMULATOR_NAME+"-")){ - OperatorStatistics localStats = (OperatorStatistics) accResults.get(accumulatorName); - LOG.debug("Local Stats: " + accumulatorName); - LOG.debug(localStats.toString()); - if (merged == null){ - merged = localStats.clone(); - }else { - merged.merge(localStats); - } - } - } - - LOG.debug("Local Stats Merged: \n"); - LOG.debug(merged.toString()); - - Assert.assertEquals("Global cardinality should be 999", 999, globalStats.cardinality); - Assert.assertEquals("Count distinct estimate should be around 100 and is "+globalStats.estimateCountDistinct() - , 100.0, (double)globalStats.estimateCountDistinct(),5.0); - Assert.assertTrue("The total number of heavy hitters should be between 0 and 5." - , globalStats.getHeavyHitters().size() > 0 && globalStats.getHeavyHitters().size() <= 5); - Assert.assertEquals("Min when merging the local accumulators should correspond with min" + - "of the global accumulator",merged.getMin(),globalStats.getMin()); - Assert.assertEquals("Max resulting from merging the local accumulators should correspond to" + - "max of the global accumulator",merged.getMax(),globalStats.getMax()); - Assert.assertEquals("Count distinct when merging the local accumulators should correspond to " + - "count distinct in the global accumulator",merged.estimateCountDistinct(),globalStats.estimateCountDistinct()); - Assert.assertEquals("The number of heavy hitters when merging the local accumulators should correspond " + - "to the number of heavy hitters in the global accumulator",merged.getHeavyHitters().size(),globalStats.getHeavyHitters().size()); - } - - @Test - public void testAccumulatorMinMax() throws Exception { - - String input = ""; - - Random rand = new Random(); - - for (int i = 1; i < 1000; i++) { - if (rand.nextDouble() < 0.2) { - input += String.valueOf(rand.nextInt(4)) + "\n"; - } else { - input += String.valueOf(rand.nextInt(100)) + "\n"; - } - } - - String inputFile = createTempFile("datapoints.txt", input); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - - OperatorStatisticsConfig operatorStatisticsConfig = - new OperatorStatisticsConfig(false); - operatorStatisticsConfig.collectMax = true; - operatorStatisticsConfig.collectMin = true; - - env.readTextFile(inputFile). - flatMap(new StringToInt(operatorStatisticsConfig)). - output(new DiscardingOutputFormat<Tuple1<Integer>>()); - - JobExecutionResult result = env.execute(); - - OperatorStatistics globalStats = result.getAccumulatorResult(ACCUMULATOR_NAME); -// System.out.println("Global Stats"); -// System.out.println(globalStats.toString()); - - Assert.assertTrue("Min value for accumulator should not be null",globalStats.getMin()!=null); - Assert.assertTrue("Max value for accumulator should not be null",globalStats.getMax()!=null); - } - - @Test - public void testAccumulatorCountDistinctLinearCounting() throws Exception { - - String input = ""; - - Random rand = new Random(); - - for (int i = 1; i < 1000; i++) { - if (rand.nextDouble() < 0.2) { - input += String.valueOf(rand.nextInt(4)) + "\n"; - } else { - input += String.valueOf(rand.nextInt(100)) + "\n"; - } - } - - String inputFile = createTempFile("datapoints.txt", input); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - - OperatorStatisticsConfig operatorStatisticsConfig = - new OperatorStatisticsConfig(false); - operatorStatisticsConfig.collectCountDistinct = true; - operatorStatisticsConfig.countDistinctAlgorithm = OperatorStatisticsConfig.CountDistinctAlgorithm.LINEAR_COUNTING; - operatorStatisticsConfig.setCountDbitmap(10000); - - env.readTextFile(inputFile). - flatMap(new StringToInt(operatorStatisticsConfig)). - output(new DiscardingOutputFormat<Tuple1<Integer>>()); - - JobExecutionResult result = env.execute(); - - OperatorStatistics globalStats = result.getAccumulatorResult(ACCUMULATOR_NAME); -// System.out.println("Global Stats"); -// System.out.println(globalStats.toString()); - - Assert.assertTrue("Count Distinct for accumulator should not be null",globalStats.countDistinct!=null); - } - - @Test - public void testAccumulatorHeavyHitterCountMinSketch() throws Exception { - - String input = ""; - - Random rand = new Random(); - - for (int i = 1; i < 1000; i++) { - if (rand.nextDouble() < 0.2) { - input += String.valueOf(rand.nextInt(4)) + "\n"; - } else { - input += String.valueOf(rand.nextInt(100)) + "\n"; - } - } - - String inputFile = createTempFile("datapoints.txt", input); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().disableSysoutLogging(); - - OperatorStatisticsConfig operatorStatisticsConfig = - new OperatorStatisticsConfig(false); - operatorStatisticsConfig.collectHeavyHitters = true; - operatorStatisticsConfig.heavyHitterAlgorithm = OperatorStatisticsConfig.HeavyHitterAlgorithm.COUNT_MIN_SKETCH; - - env.readTextFile(inputFile). - flatMap(new StringToInt(operatorStatisticsConfig)). - output(new DiscardingOutputFormat<Tuple1<Integer>>()); - - JobExecutionResult result = env.execute(); - - OperatorStatistics globalStats = result.getAccumulatorResult(ACCUMULATOR_NAME); -// System.out.println("Global Stats"); -// System.out.println(globalStats.toString()); - - Assert.assertTrue("Count Distinct for accumulator should not be null",globalStats.heavyHitter!=null); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java deleted file mode 100644 index 9deaec6..0000000 --- a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/CountMinHeavyHitterTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics.heavyhitters; - -import com.clearspring.analytics.stream.frequency.CountMinSketch; -import org.junit.Test; - -import java.util.Iterator; -import java.util.Map; -import java.util.Random; - -import static org.junit.Assert.assertTrue; - -/** - * Test the structure built to track heavy hitters using the count min sketch from - * {@link com.clearspring.analytics.stream.frequency.CountMinSketch} - */ -public class CountMinHeavyHitterTest { - - static final double fraction = 0.01; - static final double error = 0.005; - static final double confidence = 0.99; - static final int seed = 7362181; - static final Random r = new Random(); - static final int cardinality = 1000000; - static final int maxScale = 100000; - - @Test - public void testAccuracy() { - - long[] actualFreq = new long[maxScale]; - - CountMinHeavyHitter cmTopK = new CountMinHeavyHitter(fraction,error,confidence,seed); - - for (int i = 0; i < cardinality; i++) { - int value; - if (r.nextDouble()<0.1){ - value = r.nextInt(10); - }else{ - value = r.nextInt(maxScale); - } - cmTopK.addObject(value); - actualFreq[value]++; - } - - long frequency = (long)Math.ceil(cardinality* fraction); - for (int i=0;i<actualFreq.length;i++){ - if (actualFreq[i]>frequency){ - assertTrue("Heavy Hitter not found :" + i +","+ actualFreq[i], cmTopK.getHeavyHitters().containsKey(i)); - } - } - - Iterator it = cmTopK.getHeavyHitters().entrySet().iterator(); - while (it.hasNext()) { - Map.Entry heavyHitter = (Map.Entry)it.next(); - Long estimateError = (Long)heavyHitter.getValue() - actualFreq[(Integer)heavyHitter.getKey()]; - assertTrue("Difference between real frequency and estimate is too large: " + estimateError, - estimateError < (error*cardinality)); - } - } - - @Test - public void merge() throws CountMinHeavyHitter.CMHeavyHitterMergeException { - - int numToMerge = 5; - - long[] actualFreq = new long[maxScale]; - CountMinHeavyHitter merged = new CountMinHeavyHitter(fraction,error,confidence,seed); - long totalCardinality = 0; - - CountMinHeavyHitter[] sketches = new CountMinHeavyHitter[numToMerge]; - for (int i = 0; i < numToMerge; i++) { - CountMinSketch cms = new CountMinSketch(error, confidence, seed); - sketches[i] = new CountMinHeavyHitter(cms, fraction); - for (int j = 0; j < cardinality; j++) { - int val; - if (r.nextDouble()<0.1){ - val = r.nextInt(10); - }else{ - val = r.nextInt(maxScale); - } - sketches[i].addObject(val); - actualFreq[val]++; - totalCardinality++; - } - merged.merge(sketches[i]); - } - - Map<Object,Long> mergedHeavyHitters = merged.getHeavyHitters(); - long frequency = (long)(totalCardinality*fraction); - - for (int i = 0; i < actualFreq.length; ++i) { - if (actualFreq[i] >= frequency) { - assertTrue("All items with freq. > s.n will be found. Item " + i + ". Real freq. " + actualFreq[i] + " Expected freq." + frequency, mergedHeavyHitters.containsKey(i)); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java b/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java deleted file mode 100644 index 502b361..0000000 --- a/flink-contrib/flink-operator-stats/src/test/java/org/apache/flink/contrib/operatorstatistics/heavyhitters/LossyCountingTest.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.operatorstatistics.heavyhitters; - -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; -import java.util.Random; - -import static org.junit.Assert.assertTrue; - -/* -* Test the structure implemented for Lossy Counting -*/ - -public class LossyCountingTest { - - private static final Logger LOG = LoggerFactory.getLogger(LossyCountingTest.class); - - static final double fraction = 0.01; - static final double error = 0.005; - static final Random r = new Random(); - static final int cardinality = 1000000; - static final int maxScale = 100000; - - @Test - public void testAccuracy() { - - long[] actualFreq = new long[maxScale]; - - LossyCounting lossyCounting = new LossyCounting(fraction,error); - - for (int i = 0; i < cardinality; i++) { - int value; - if (r.nextDouble()<0.1){ - value = r.nextInt(10); - }else{ - value = r.nextInt(maxScale); - } - lossyCounting.addObject(value); - actualFreq[value]++; - } - - LOG.debug("Size of heavy hitters: "+lossyCounting.getHeavyHitters().size()); - LOG.debug(lossyCounting.toString()); - - Map<Object,Long> heavyHitters = lossyCounting.getHeavyHitters(); - long frequency = (long)Math.ceil(cardinality* fraction); - long minFrequency = (long)Math.ceil(cardinality* (fraction-error)); - - for (int i = 0; i < actualFreq.length; ++i) { - if (actualFreq[i]>=frequency) { - assertTrue("All items with freq. > s.n will be found. Item " + i + ". Real freq. " + actualFreq[i] + " Expected freq." + frequency, heavyHitters.containsKey(i)); - } - if (heavyHitters.containsKey(i)){ - assertTrue("no item with freq. < (s-e).n will be found. Item " + i + ". Real freq. " + actualFreq[i]+" Min freq."+ minFrequency, actualFreq[i]>=minFrequency); - assertTrue("the estimated freq. underestimates the true freq. by < e.n. Real freq. " + actualFreq[i] + " Lower bound "+heavyHitters.get(i), - Math.abs(heavyHitters.get(i)-actualFreq[i]) < error*cardinality); - } - } - } - - @Test - public void merge() throws HeavyHitterMergeException { - int numToMerge = 5; - LossyCounting merged = new LossyCounting(fraction,error); - LossyCounting[] sketches = new LossyCounting[numToMerge]; - - long[] actualFreq = new long[maxScale]; - long totalCardinality = 0; - - for (int i = 0; i < numToMerge; i++) { - sketches[i] = new LossyCounting(fraction,error); - for (int j = 0; j < cardinality; j++) { - int val; - if (r.nextDouble()<0.1){ - val = r.nextInt(10); - }else{ - val = r.nextInt(maxScale); - } - sketches[i].addObject(val); - actualFreq[val]++; - totalCardinality++; - } - merged.merge(sketches[i]); - } - - System.out.println("\nMERGED\n" + merged.toString()); - - Map<Object,Long> mergedHeavyHitters = merged.getHeavyHitters(); - long frequency = (long)(totalCardinality*fraction); - long minFrequency = (long)(totalCardinality*(fraction-error)); - - System.out.println("Frequency Threshold:" + frequency); - System.out.println("False positive Threshold:" + minFrequency); - System.out.println("Frequency of 14:" + actualFreq[14]); - - System.out.println("Real frequent items: "); - - for (int i = 0; i < actualFreq.length; ++i) { - if (actualFreq[i]>=frequency) { - System.out.println(i+": "+actualFreq[i]); - assertTrue("All items with freq. > s.n will be found. Item " + i + ". Real freq. " + actualFreq[i]+" Expected freq."+ frequency, mergedHeavyHitters.containsKey(i)); - } - if (mergedHeavyHitters.containsKey(i)){ - assertTrue("no item with freq. < (s-e).n will be found. Item " + i + ". Real freq. " + actualFreq[i]+" Min freq."+ minFrequency, actualFreq[i]>=minFrequency); - assertTrue("the estimated freq. underestimates the true freq. by < e.n. Real freq. " + actualFreq[i] + " Lower bound "+mergedHeavyHitters.get(i), - Math.abs(mergedHeavyHitters.get(i)-actualFreq[i]) < error*cardinality); - } - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d5b97b07/flink-contrib/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/pom.xml b/flink-contrib/pom.xml index be34446..6b01acd 100644 --- a/flink-contrib/pom.xml +++ b/flink-contrib/pom.xml @@ -41,7 +41,6 @@ under the License. <module>flink-storm-examples</module> <module>flink-streaming-contrib</module> <module>flink-tweet-inputformat</module> - <module>flink-operator-stats</module> <module>flink-connector-wikiedits</module> <module>flink-statebackend-rocksdb</module> </modules>
