[BEAM-2728] Extension for sketch-based statistics : HyperLogLog
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fd58a423 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fd58a423 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fd58a423 Branch: refs/heads/mr-runner Commit: fd58a423099b5aea5cd78c862e81c6a03bbf6521 Parents: 4f4632c Author: Arnaud Fournier <arnaudfournier...@gmail.com> Authored: Thu Jul 20 16:57:38 2017 +0200 Committer: Eugene Kirpichov <ekirpic...@gmail.com> Committed: Fri Nov 3 15:14:57 2017 -0700 ---------------------------------------------------------------------- pom.xml | 6 + sdks/java/extensions/pom.xml | 1 + sdks/java/extensions/sketching/pom.xml | 104 ++++ .../sketching/ApproximateDistinct.java | 573 +++++++++++++++++++ .../sdk/extensions/sketching/package-info.java | 22 + .../sketching/ApproximateDistinctTest.java | 209 +++++++ sdks/java/javadoc/pom.xml | 5 + 7 files changed, 920 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b2ab5d7..baed9ba 100644 --- a/pom.xml +++ b/pom.xml @@ -502,6 +502,12 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-sketching</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-sorter</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml index ec6efb6..5e8d495 100644 --- a/sdks/java/extensions/pom.xml +++ b/sdks/java/extensions/pom.xml @@ -36,6 +36,7 @@ <module>jackson</module> <module>join-library</module> <module>protobuf</module> + <module>sketching</module> <module>sorter</module> <module>sql</module> </modules> http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/pom.xml b/sdks/java/extensions/sketching/pom.xml new file mode 100755 index 0000000..f0538ae --- /dev/null +++ b/sdks/java/extensions/sketching/pom.xml @@ -0,0 +1,104 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-parent</artifactId> + <version>2.3.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>beam-sdks-java-extensions-sketching</artifactId> + <name>Apache Beam :: SDKs :: Java :: Extensions :: Sketching</name> + + <properties> + <streamlib.version>2.9.5</streamlib.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + </dependency> + + <dependency> + <groupId>com.clearspring.analytics</groupId> + <artifactId>stream</artifactId> + <version>${streamlib.version}</version> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>com.google.auto.value</groupId> + <artifactId>auto-value</artifactId> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <!-- https://mvnrepository.com/artifact/org.apache.avro/avro --> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> + <artifactId>beam-runners-direct-java</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java new file mode 100644 index 0000000..1da0cc3 --- /dev/null +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java @@ -0,0 +1,573 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sketching; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; +import com.google.auto.value.AutoValue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * {@link PTransform}s for computing the approximate number of distinct elements in a stream. + * + * <p>This class relies on the HyperLogLog algorithm, and more precisely HyperLogLog+, the improved + * version of Google. + * + * <h2>References</h2> + * + * <p>The implementation comes from <a href="https://github.com/addthis/stream-lib">Addthis' + * Stream-lib library</a>. <br> + * The original paper of the HyperLogLog is available <a + * href="http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf">here</a>. <br> + * A paper from the same authors to have a clearer view of the algorithm is available <a + * href="http://cscubs.cs.uni-bonn.de/2016/proceedings/paper-03.pdf">here</a>. <br> + * Google's HyperLogLog+ version is detailed in <a + * href="https://research.google.com/pubs/pub40671.html">this paper</a>. + * + * <h2>Parameters</h2> + * + * <p>Two parameters can be tuned in order to control the computation's accuracy: + * + * <ul> + * <li><b>Precision: {@code p}</b> <br> + * Controls the accuracy of the estimation. The precision value will have an impact on the + * number of buckets used to store information about the distinct elements. <br> + * In general one can expect a relative error of about {@code 1.1 / sqrt(2^p)}. The value + * should be of at least 4 to guarantee a minimal accuracy. <br> + * By default, the precision is set to {@code 12} for a relative error of around {@code 2%}. + * <li><b>Sparse Precision: {@code sp}</b> <br> + * Used to create a sparse representation in order to optimize memory and improve accuracy at + * small cardinalities. <br> + * The value of {@code sp} should be greater than {@code p}, but lower than 32. <br> + * By default, the sparse representation is not used ({@code sp = 0}). One should use it if + * the cardinality may be less than {@code 12000}. + * </ul> + * + * <h2>Examples</h2> + * + * <p>There are 2 ways of using this class: + * + * <ul> + * <li>Use the {@link PTransform}s that return {@code PCollection<Long>} corresponding to the + * estimate number of distinct elements in the input {@link PCollection} of objects or for + * each key in a {@link PCollection} of {@link KV}s. + * <li>Use the {@link ApproximateDistinctFn} {@code CombineFn} that is exposed in order to make + * advanced processing involving the {@link HyperLogLogPlus} structure which resumes the + * stream. + * </ul> + * + * <h3>Using the Transforms</h3> + * + * <h4>Example 1: globally default use</h4> + * + * <pre>{@code + * PCollection<Integer> input = ...; + * PCollection<Long> hllSketch = input.apply(ApproximateDistinct.<Integer>globally()); + * }</pre> + * + * <h4>Example 2: per key default use</h4> + * + * <pre>{@code + * PCollection<Integer, String> input = ...; + * PCollection<Integer, Long> hllSketches = input.apply(ApproximateDistinct + * .<Integer, String>perKey()); + * }</pre> + * + * <h4>Example 3: tune precision and use sparse representation</h4> + * + * <p>One can tune the precision and sparse precision parameters in order to control the accuracy + * and the memory. The tuning works exactly the same for {@link #globally()} and {@link #perKey()}. + * + * <pre>{@code + * int precision = 15; + * int sparsePrecision = 25; + * PCollection<Double> input = ...; + * PCollection<Long> hllSketch = input.apply(ApproximateDistinct + * .<Double>globally() + * .withPrecision(precision) + * .withSparsePrecision(sparsePrecision)); + * }</pre> + * + * <h3>Using the {@link ApproximateDistinctFn} CombineFn</h3> + * + * <p>The CombineFn does the same thing as the transform but it can be used in cases where you want + * to manipulate the {@link HyperLogLogPlus} sketch, for example if you want to store it in a + * database to have a backup. It can also be used in stateful processing or in {@link + * org.apache.beam.sdk.transforms.CombineFns.ComposedCombineFn}. + * + * <h4>Example 1: basic use</h4> + * + * <p>This example is not really interesting but show how you can properly create an {@link + * ApproximateDistinctFn}. One must always specify a coder using the {@link + * ApproximateDistinctFn#create(Coder)} method. + * + * <pre>{@code + * PCollection<Integer> input = ...; + * PCollection<HyperLogLogPlus> output = input.apply(Combine.globally(ApproximateDistinctFn + * .<Integer>create(BigEndianIntegerCoder.of())); + * }</pre> + * + * <h4>Example 2: use the {@link CombineFn} in a stateful {@link ParDo}</h4> + * + * <p>One may want to use the {@link ApproximateDistinctFn} in a stateful ParDo in order to make + * some processing depending on the current cardinality of the stream. <br> + * For more information about stateful processing see the blog spot on this topic <a + * href="https://beam.apache.org/blog/2017/02/13/stateful-processing.html">here</a>. + * + * <p>Here is an example of {@link DoFn} using an {@link ApproximateDistinctFn} as a {@link + * org.apache.beam.sdk.state.CombiningState}: + * + * <pre><code> + * {@literal class StatefulCardinality<V> extends DoFn<V, OutputT>} { + * {@literal @StateId}("hyperloglog") + * {@literal private final StateSpec<CombiningState<V, HyperLogLogPlus, HyperLogLogPlus>>} + * indexSpec; + * + * {@literal public StatefulCardinality(ApproximateDistinctFn<V> fn)} { + * indexSpec = StateSpecs.combining(fn); + * } + * + * {@literal @ProcessElement} + * public void processElement( + * ProcessContext context, + * {@literal @StateId}("hllSketch") + * {@literal CombiningState<V, HyperLogLogPlus, HyperLogLogPlus> hllSketch)} { + * long current = MoreObjects.firstNonNull(hllSketch.getAccum().cardinality(), 0L); + * hllSketch.add(context.element()); + * context.output(...); + * } + * } + * </code></pre> + * + * <p>Then the {@link DoFn} can be called like this: + * + * <pre>{@code + * PCollection<V> input = ...; + * ApproximateDistinctFn<V> myFn = ApproximateDistinctFn.create(input.getCoder()); + * PCollection<V> = input.apply(ParDo.of(new StatefulCardinality<>(myFn))); + * }</pre> + * + * <h4>Example 3: use the {@link RetrieveCardinality} utility class</h4> + * + * <p>One may want to retrieve the cardinality as a long after making some advanced processing using + * the {@link HyperLogLogPlus} structure. <br> + * The {@link RetrieveCardinality} utility class provides an easy way to do so: + * + * <pre>{@code + * PCollection<MyObject> input = ...; + * PCollection<HyperLogLogPlus> hll = input.apply(Combine.globally(ApproximateDistinctFn + * .<MyObject>create(new MyObjectCoder()) + * .withSparseRepresentation(20))); + * + * // Some advanced processing + * PCollection<SomeObject> advancedResult = hll.apply(...); + * + * PCollection<Long> cardinality = hll.apply(ApproximateDistinct.RetrieveCardinality.globally()); + * + * }</pre> + * + * <p><b>Warning: this class is experimental.</b> Its API is subject to change in future versions of + * Beam. For example, it may be merged with the {@link + * org.apache.beam.sdk.transforms.ApproximateUnique} transform. + */ +@Experimental +public final class ApproximateDistinct { + + /** + * Computes the approximate number of distinct elements in the input {@code PCollection<InputT>} + * and returns a {@code PCollection<Long>}. + * + * @param <InputT> the type of the elements in the input {@link PCollection} + */ + public static <InputT> GloballyDistinct<InputT> globally() { + return GloballyDistinct.<InputT>builder().build(); + } + + /** + * Like {@link #globally} but per key, i.e computes the approximate number of distinct values per + * key in a {@code PCollection<KV<K, V>>} and returns {@code PCollection<KV<K, Long>>}. + * + * @param <K> type of the keys mapping the elements + * @param <V> type of the values being combined per key + */ + public static <K, V> PerKeyDistinct<K, V> perKey() { + return PerKeyDistinct.<K, V>builder().build(); + } + + /** + * Implementation of {@link #globally()}. + * + * @param <InputT> + */ + @AutoValue + public abstract static class GloballyDistinct<InputT> + extends PTransform<PCollection<InputT>, PCollection<Long>> { + + abstract int precision(); + + abstract int sparsePrecision(); + + abstract Builder<InputT> toBuilder(); + + static <InputT> Builder<InputT> builder() { + return new AutoValue_ApproximateDistinct_GloballyDistinct.Builder<InputT>() + .setPrecision(12) + .setSparsePrecision(0); + } + + @AutoValue.Builder + abstract static class Builder<InputT> { + abstract Builder<InputT> setPrecision(int p); + + abstract Builder<InputT> setSparsePrecision(int sp); + + abstract GloballyDistinct<InputT> build(); + } + + public GloballyDistinct<InputT> withPrecision(int p) { + return toBuilder().setPrecision(p).build(); + } + + public GloballyDistinct<InputT> withSparsePrecision(int sp) { + return toBuilder().setSparsePrecision(sp).build(); + } + + @Override + public PCollection<Long> expand(PCollection<InputT> input) { + return input + .apply( + "Compute HyperLogLog Structure", + Combine.globally( + ApproximateDistinctFn.<InputT>create(input.getCoder()) + .withPrecision(this.precision()) + .withSparseRepresentation(this.sparsePrecision()))) + .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.globally())); + } + } + + /** + * Implementation of {@link #perKey()}. + * + * @param <K> + * @param <V> + */ + @AutoValue + public abstract static class PerKeyDistinct<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Long>>> { + + abstract int precision(); + + abstract int sparsePrecision(); + + abstract Builder<K, V> toBuilder(); + + static <K, V> Builder<K, V> builder() { + return new AutoValue_ApproximateDistinct_PerKeyDistinct.Builder<K, V>() + .setPrecision(12) + .setSparsePrecision(0); + } + + @AutoValue.Builder + abstract static class Builder<K, V> { + abstract Builder<K, V> setPrecision(int p); + + abstract Builder<K, V> setSparsePrecision(int sp); + + abstract PerKeyDistinct<K, V> build(); + } + + public PerKeyDistinct<K, V> withPrecision(int p) { + return toBuilder().setPrecision(p).build(); + } + + public PerKeyDistinct<K, V> withSparsePrecision(int sp) { + return toBuilder().setSparsePrecision(sp).build(); + } + + @Override + public PCollection<KV<K, Long>> expand(PCollection<KV<K, V>> input) { + KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder(); + return input + .apply( + Combine.<K, V, HyperLogLogPlus>perKey( + ApproximateDistinctFn.<V>create(inputCoder.getValueCoder()) + .withPrecision(this.precision()) + .withSparseRepresentation(this.sparsePrecision()))) + .apply("Retrieve Cardinality", ParDo.of(RetrieveCardinality.<K>perKey())); + } + } + + /** + * Implements the {@link CombineFn} of {@link ApproximateDistinct} transforms. + * + * @param <InputT> the type of the elements in the input {@link PCollection} + */ + public static class ApproximateDistinctFn<InputT> + extends CombineFn<InputT, HyperLogLogPlus, HyperLogLogPlus> { + + private final int p; + + private final int sp; + + private final Coder<InputT> inputCoder; + + private ApproximateDistinctFn(int p, int sp, Coder<InputT> coder) { + this.p = p; + this.sp = sp; + inputCoder = coder; + } + + /** + * Returns an {@link ApproximateDistinctFn} combiner with the given input coder. + * + * @param coder the coder that encodes the elements' type + */ + public static <InputT> ApproximateDistinctFn<InputT> create(Coder<InputT> coder) { + try { + coder.verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e); + } + return new ApproximateDistinctFn<>(12, 0, coder); + } + + /** + * Returns a new {@link ApproximateDistinctFn} combiner with a new precision {@code p}. + * + * <p>Keep in mind that {@code p} cannot be lower than 4, because the estimation would be too + * inaccurate. + * + * <p>See {@link ApproximateDistinct#precisionForRelativeError(double)} and {@link + * ApproximateDistinct#relativeErrorForPrecision(int)} to have more information about the + * relationship between precision and relative error. + * + * @param p the precision value for the normal representation + */ + public ApproximateDistinctFn<InputT> withPrecision(int p) { + checkArgument(p >= 4, "Expected: p >= 4. Actual: p = %s", p); + return new ApproximateDistinctFn<>(p, this.sp, this.inputCoder); + } + + /** + * Returns a new {@link ApproximateDistinctFn} combiner with a sparse representation of + * precision {@code sp}. + * + * <p>Values above 32 are not yet supported by the AddThis version of HyperLogLog+. + * + * <p>Fore more information about the sparse representation, read Google's paper available <a + * href="https://research.google.com/pubs/pub40671.html">here</a>. + * + * @param sp the precision of HyperLogLog+' sparse representation + */ + public ApproximateDistinctFn<InputT> withSparseRepresentation(int sp) { + checkArgument( + (sp > this.p && sp < 32) || (sp == 0), + "Expected: p <= sp <= 32." + "Actual: p = %s, sp = %s", + this.p, + sp); + return new ApproximateDistinctFn<>(this.p, sp, this.inputCoder); + } + + @Override + public HyperLogLogPlus createAccumulator() { + return new HyperLogLogPlus(p, sp); + } + + @Override + public HyperLogLogPlus addInput(HyperLogLogPlus acc, InputT record) { + try { + acc.offer(CoderUtils.encodeToByteArray(inputCoder, record)); + } catch (CoderException e) { + throw new IllegalStateException("The input value cannot be encoded: " + e.getMessage(), e); + } + return acc; + } + + /** Output the whole structure so it can be queried, reused or stored easily. */ + @Override + public HyperLogLogPlus extractOutput(HyperLogLogPlus accumulator) { + return accumulator; + } + + @Override + public HyperLogLogPlus mergeAccumulators(Iterable<HyperLogLogPlus> accumulators) { + HyperLogLogPlus mergedAccum = createAccumulator(); + for (HyperLogLogPlus accum : accumulators) { + try { + mergedAccum.addAll(accum); + } catch (CardinalityMergeException e) { + // Should never happen because only HyperLogLogPlus accumulators are instantiated. + throw new IllegalStateException( + "The accumulators cannot be merged: " + e.getMessage(), e); + } + } + return mergedAccum; + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .add(DisplayData.item("p", p).withLabel("precision")) + .add(DisplayData.item("sp", sp).withLabel("sparse representation precision")); + } + } + + /** Coder for {@link HyperLogLogPlus} class. */ + public static class HyperLogLogPlusCoder extends CustomCoder<HyperLogLogPlus> { + + private static final HyperLogLogPlusCoder INSTANCE = new HyperLogLogPlusCoder(); + + private static final ByteArrayCoder BYTE_ARRAY_CODER = ByteArrayCoder.of(); + + public static HyperLogLogPlusCoder of() { + return INSTANCE; + } + + @Override + public void encode(HyperLogLogPlus value, OutputStream outStream) throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null HyperLogLogPlus sketch"); + } + BYTE_ARRAY_CODER.encode(value.getBytes(), outStream); + } + + @Override + public HyperLogLogPlus decode(InputStream inStream) throws IOException { + return HyperLogLogPlus.Builder.build(BYTE_ARRAY_CODER.decode(inStream)); + } + + @Override + public boolean isRegisterByteSizeObserverCheap(HyperLogLogPlus value) { + return true; + } + + @Override + protected long getEncodedElementByteSize(HyperLogLogPlus value) throws IOException { + if (value == null) { + throw new CoderException("cannot encode a null HyperLogLogPlus sketch"); + } + return value.sizeof(); + } + } + + /** + * Utility class that provides {@link DoFn}s to retrieve the cardinality from a {@link + * HyperLogLogPlus} structure in a global or perKey context. + */ + public static class RetrieveCardinality { + + public static <K> DoFn<KV<K, HyperLogLogPlus>, KV<K, Long>> perKey() { + return new DoFn<KV<K, HyperLogLogPlus>, KV<K, Long>>() { + @ProcessElement + public void processElement(ProcessContext c) { + KV<K, HyperLogLogPlus> kv = c.element(); + c.output(KV.of(kv.getKey(), kv.getValue().cardinality())); + } + }; + } + + public static DoFn<HyperLogLogPlus, Long> globally() { + return new DoFn<HyperLogLogPlus, Long>() { + @ProcessElement + public void apply(ProcessContext c) { + c.output(c.element().cardinality()); + } + }; + } + } + + /** + * Computes the precision based on the desired relative error. + * + * <p>According to the paper, the mean squared error is bounded by the following formula: + * + * <pre>b(m) / sqrt(m) + * Where m is the number of buckets used ({@code p = log2(m)}) + * and {@code b(m) < 1.106} for {@code m > 16 (and p > 4)}. + * </pre> + * + * <br> + * <b>WARNING:</b> <br> + * This does not mean relative error in the estimation <b>can't</b> be higher. <br> + * This only means that on average the relative error will be lower than the desired relative + * error. <br> + * Nevertheless, the more elements arrive in the {@link PCollection}, the lower the variation will + * be. <br> + * Indeed, this is like when you throw a dice millions of time: the relative frequency of each + * different result <code>{1,2,3,4,5,6}</code> will get closer to {@code 1/6}. + * + * @param relativeError the mean squared error should be in the interval ]0,1] + * @return the minimum precision p in order to have the desired relative error on average. + */ + public static long precisionForRelativeError(double relativeError) { + return Math.round( + Math.ceil(Math.log(Math.pow(1.106, 2.0) / Math.pow(relativeError, 2.0)) / Math.log(2))); + } + + /** + * @param p the precision i.e. the number of bits used for indexing the buckets + * @return the Mean squared error of the Estimation of cardinality to expect for the given value + * of p. + */ + public static double relativeErrorForPrecision(int p) { + if (p < 4) { + return 1.0; + } + double betaM; + switch (p) { + case 4: + betaM = 1.156; + break; + case 5: + betaM = 1.2; + break; + case 6: + betaM = 1.104; + break; + case 7: + betaM = 1.096; + break; + default: + betaM = 1.05; + break; + } + return betaM / Math.sqrt(Math.exp(p * Math.log(2))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java new file mode 100755 index 0000000..2e8d60e --- /dev/null +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Utilities for computing statistical indicators using probabilistic sketches. + */ +package org.apache.beam.sdk.extensions.sketching; http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java new file mode 100644 index 0000000..cdbcc45 --- /dev/null +++ b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinctTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sketching; + +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.extensions.sketching.ApproximateDistinct.ApproximateDistinctFn; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Tests for {@link ApproximateDistinct}. */ +@RunWith(JUnit4.class) +public class ApproximateDistinctTest implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(ApproximateDistinctTest.class); + + @Rule public final transient TestPipeline tp = TestPipeline.create(); + + @Test + public void smallCardinality() { + final int smallCard = 1000; + final int p = 6; + final double expectedErr = 1.104 / Math.sqrt(p); + + List<Integer> small = new ArrayList<>(); + for (int i = 0; i < smallCard; i++) { + small.add(i); + } + + PCollection<Long> cardinality = + tp.apply("small stream", Create.<Integer>of(small)) + .apply("small cardinality", ApproximateDistinct.<Integer>globally().withPrecision(p)); + + PAssert.that("Not Accurate Enough", cardinality) + .satisfies(new VerifyAccuracy(smallCard, expectedErr)); + + tp.run(); + } + + @Test + public void bigCardinality() { + final int cardinality = 15000; + final int p = 15; + final int sp = 20; + final double expectedErr = 1.04 / Math.sqrt(p); + + List<Integer> stream = new ArrayList<>(); + for (int i = 1; i <= cardinality; i++) { + stream.addAll(Collections.nCopies(2, i)); + } + Collections.shuffle(stream); + + PCollection<Long> res = + tp.apply("big stream", Create.<Integer>of(stream)) + .apply( + "big cardinality", + ApproximateDistinct.<Integer>globally().withPrecision(p).withSparsePrecision(sp)); + + PAssert.that("Verify Accuracy for big cardinality", res) + .satisfies(new VerifyAccuracy(cardinality, expectedErr)); + + tp.run(); + } + + @Test + public void perKey() { + final int cardinality = 1000; + final int p = 15; + final double expectedErr = 1.04 / Math.sqrt(p); + + List<Integer> stream = new ArrayList<>(); + for (int i = 1; i <= cardinality; i++) { + stream.addAll(Collections.nCopies(2, i)); + } + Collections.shuffle(stream); + + PCollection<Long> results = + tp.apply("per key stream", Create.of(stream)) + .apply("create keys", WithKeys.<Integer, Integer>of(1)) + .apply( + "per key cardinality", + ApproximateDistinct.<Integer, Integer>perKey().withPrecision(p)) + .apply("extract values", Values.<Long>create()); + + PAssert.that("Verify Accuracy for cardinality per key", results) + .satisfies(new VerifyAccuracy(cardinality, expectedErr)); + + tp.run(); + } + + @Test + public void customObject() { + final int cardinality = 500; + final int p = 15; + final double expectedErr = 1.04 / Math.sqrt(p); + + Schema schema = + SchemaBuilder.record("User") + .fields() + .requiredString("Pseudo") + .requiredInt("Age") + .endRecord(); + List<GenericRecord> users = new ArrayList<>(); + for (int i = 1; i <= cardinality; i++) { + GenericData.Record newRecord = new GenericData.Record(schema); + newRecord.put("Pseudo", "User" + i); + newRecord.put("Age", i); + users.add(newRecord); + } + PCollection<Long> results = + tp.apply("Create stream", Create.of(users).withCoder(AvroCoder.of(schema))) + .apply( + "Test custom object", + ApproximateDistinct.<GenericRecord>globally().withPrecision(p)); + + PAssert.that("Verify Accuracy for custom object", results) + .satisfies(new VerifyAccuracy(cardinality, expectedErr)); + + tp.run(); + } + + @Test + public void testCoder() throws Exception { + HyperLogLogPlus hllp = new HyperLogLogPlus(12, 18); + for (int i = 0; i < 10; i++) { + hllp.offer(i); + } + CoderProperties.<HyperLogLogPlus>coderDecodeEncodeEqual( + ApproximateDistinct.HyperLogLogPlusCoder.of(), hllp); + } + + @Test + public void testDisplayData() { + final ApproximateDistinctFn<Integer> fnWithPrecision = + ApproximateDistinctFn.create(BigEndianIntegerCoder.of()).withPrecision(23); + + assertThat(DisplayData.from(fnWithPrecision), hasDisplayItem("p", 23)); + assertThat(DisplayData.from(fnWithPrecision), hasDisplayItem("sp", 0)); + } + + class VerifyAccuracy implements SerializableFunction<Iterable<Long>, Void> { + + private final int expectedCard; + + private final double expectedError; + + VerifyAccuracy(int expectedCard, double expectedError) { + this.expectedCard = expectedCard; + this.expectedError = expectedError; + } + + @Override + public Void apply(Iterable<Long> input) { + for (Long estimate : input) { + boolean isAccurate = Math.abs(estimate - expectedCard) / expectedCard < expectedError; + Assert.assertTrue( + "not accurate enough : \nExpected Cardinality : " + + expectedCard + + "\nComputed Cardinality : " + + estimate, + isAccurate); + } + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/fd58a423/sdks/java/javadoc/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/javadoc/pom.xml b/sdks/java/javadoc/pom.xml index 79ac933..85440ff 100644 --- a/sdks/java/javadoc/pom.xml +++ b/sdks/java/javadoc/pom.xml @@ -94,6 +94,11 @@ <dependency> <groupId>org.apache.beam</groupId> + <artifactId>beam-sdks-java-extensions-sketching</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-sorter</artifactId> </dependency>