This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit cc9182a6b257726b13b69073aa321da1ad4d6c37 Author: ArnaudFnr <arnaudfournier...@gmail.com> AuthorDate: Fri Jan 12 01:41:34 2018 +0100 Make Sketch AutoValue + Javadoc update --- .../extensions/sketching/ApproximateDistinct.java | 9 +- .../extensions/sketching/SketchFrequencies.java | 126 ++++++++++----------- .../sketching/SketchFrequenciesTest.java | 14 +-- 3 files changed, 70 insertions(+), 79 deletions(-) diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java index 80446a0..3fea951 100644 --- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/ApproximateDistinct.java @@ -231,7 +231,7 @@ public final class ApproximateDistinct { /** * Implementation of {@link #globally()}. * - * @param <InputT> + * @param <InputT> the type of the elements in the input {@link PCollection} */ @AutoValue public abstract static class GloballyDistinct<InputT> @@ -282,8 +282,8 @@ public final class ApproximateDistinct { /** * Implementation of {@link #perKey()}. * - * @param <K> - * @param <V> + * @param <K> type of the keys mapping the elements + * @param <V> type of the values being combined per key */ @AutoValue public abstract static class PerKeyDistinct<K, V> @@ -360,7 +360,8 @@ public final class ApproximateDistinct { try { coder.verifyDeterministic(); } catch (Coder.NonDeterministicException e) { - throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e); + throw new IllegalArgumentException("Coder must be deterministic to perform this sketch." + + e.getMessage(), e); } return new ApproximateDistinctFn<>(12, 0, coder); } diff --git a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java index 6508333..bb62053 100644 --- a/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java +++ b/sdks/java/extensions/sketching/src/main/java/org/apache/beam/sdk/extensions/sketching/SketchFrequencies.java @@ -82,8 +82,8 @@ import org.apache.beam.sdk.values.PCollection; * * <p>With the default values, this gives a depth of 200 and a width of 10. * - * <p><b>WARNING:</b> The relative error concerns the total number of distinct number elements - * in the stream. Thus, an element having 1000 occurrences in a streams of 1 million distinct + * <p><b>WARNING:</b> The relative error concerns the total number of distinct elements + * in a stream. Thus, an element having 1000 occurrences in a stream of 1 million distinct * elements will have 1% of 1 million as relative error, i.e. 10 000. This means the frequency * is 1000 +/- 10 000 for this element. Therefore this is obvious that the relative error must * be really low in very large streams. <br> @@ -154,14 +154,14 @@ import org.apache.beam.sdk.values.PCollection; * {@literal new DoFn<Long, KV<MyObject, Long>>()} { * {@literal @ProcessElement} * public void procesElement(ProcessContext c) { - * Long object = c.element(); + * Long elem = c.element(); * CountMinSketch sketch = c.sideInput(sketchView); * sketch.estimateCount(elem, coder); * }}).withSideInputs(sketchView)); * } * </code></pre> * - * <h3>Example 4 : Using the CombineFn</h3> + * <h3>Example 4: Using the CombineFn</h3> * * <p>The {@code CombineFn} does the same thing as the {@code PTransform}s but * it can be used for doing stateful processing or in @@ -215,7 +215,7 @@ public final class SketchFrequencies { /** * Implementation of {@link #globally()}. * - * @param <InputT> + * @param <InputT> the type of the elements in the input {@link PCollection} */ @AutoValue public abstract static class GlobalSketch<InputT> @@ -255,15 +255,15 @@ public final class SketchFrequencies { return input.apply("Compute Count-Min Sketch", Combine.<InputT, Sketch<InputT>>globally(CountMinSketchFn .<InputT>create(input.getCoder()) - .withAccuracy(this.relativeError(), this.confidence()))); + .withAccuracy(relativeError(), confidence()))); } } /** * Implementation of {@link #perKey()}. * - * @param <K> - * @param <V> + * @param <K> type of the keys mapping the elements + * @param <V> type of the values being combined per key */ @AutoValue public abstract static class PerKeySketch<K, V> @@ -277,7 +277,7 @@ public final class SketchFrequencies { static <K, V> Builder<K, V> builder() { return new AutoValue_SketchFrequencies_PerKeySketch.Builder<K, V>() - .setRelativeError(0.001) + .setRelativeError(0.01) .setConfidence(0.999); } @@ -304,7 +304,7 @@ public final class SketchFrequencies { return input.apply("Compute Count-Min Sketch perKey", Combine.<K, V, Sketch<V>>perKey(CountMinSketchFn .<V>create(inputCoder.getValueCoder()) - .withAccuracy(this.relativeError(), this.confidence()))); + .withAccuracy(relativeError(), confidence()))); } } @@ -331,15 +331,17 @@ public final class SketchFrequencies { } /** - * Returns an {@link CountMinSketchFn} combiner with the given input coder. + * Returns an {@link CountMinSketchFn} combiner with the given input coder. <br> + * <b>Warning :</b> the coder must be deterministic. * * @param coder the coder that encodes the elements' type */ - public static <InputT> CountMinSketchFn<InputT>create(Coder<InputT> coder) { + public static <InputT> CountMinSketchFn<InputT> create(Coder<InputT> coder) { try { coder.verifyDeterministic(); } catch (Coder.NonDeterministicException e) { - throw new IllegalArgumentException("Coder is not deterministic ! " + e.getMessage(), e); + throw new IllegalArgumentException("Coder must be deterministic to perform this sketch." + + e.getMessage(), e); } return new CountMinSketchFn<>(coder, 0.01, 0.999); } @@ -360,13 +362,13 @@ public final class SketchFrequencies { } if (confidence <= 0D || confidence >= 1D) { - throw new IllegalArgumentException("The confidence must be comprised between 0 and 1"); + throw new IllegalArgumentException("The confidence must be between 0 and 1"); } - return new CountMinSketchFn<InputT>(this.inputCoder, epsilon, confidence); + return new CountMinSketchFn<InputT>(inputCoder, epsilon, confidence); } @Override public Sketch<InputT> createAccumulator() { - return new Sketch<InputT>(this.epsilon, this.confidence); + return Sketch.<InputT>create(epsilon, confidence); } @Override public Sketch<InputT> addInput(Sketch<InputT> accumulator, InputT element) { @@ -378,18 +380,17 @@ public final class SketchFrequencies { @Override public Sketch<InputT> mergeAccumulators(Iterable<Sketch<InputT>> accumulators) { Iterator<Sketch<InputT>> it = accumulators.iterator(); Sketch<InputT> first = it.next(); - CountMinSketch mergedSketches = first.sketch; + CountMinSketch mergedSketches = first.sketch(); try { while (it.hasNext()) { - mergedSketches = CountMinSketch.merge(mergedSketches, it.next().sketch); + mergedSketches = CountMinSketch.merge(mergedSketches, it.next().sketch()); } } catch (FrequencyMergeException e) { // Should never happen because every instantiated accumulator are of the same type. - throw new IllegalStateException("The accumulators cannot be merged !" + e.getMessage()); + throw new IllegalStateException("The accumulators cannot be merged:" + e.getMessage()); } - first.sketch = mergedSketches; - return first; + return Sketch.<InputT>create(mergedSketches); } /** Output the whole structure so it can be queried, reused or stored easily. */ @@ -419,30 +420,46 @@ public final class SketchFrequencies { } /** - * Wrapper of StreamLib's Count-Min Sketch to fit with Beam requirements. + * Wrap StreamLib's Count-Min Sketch to support counting all user types by hashing + * the encoded user type using the supplied deterministic coder. This is required + * since objects in Apache Beam are considered equal if their encodings are equal. */ - public static class Sketch<T> implements Serializable { + @AutoValue + public abstract static class Sketch<T> implements Serializable { static final int SEED = 123456; - int width; - int depth; - CountMinSketch sketch; + static <T> Sketch<T> create(double eps, double conf) { + int width = (int) Math.ceil(2 / eps); + int depth = (int) Math.ceil(-Math.log(1 - conf) / Math.log(2)); + return new AutoValue_SketchFrequencies_Sketch<T>( + depth, + width, + new CountMinSketch(depth, width, SEED)); + } - public Sketch(double eps, double confidence) { - this.width = (int) Math.ceil(2 / eps); - this.depth = (int) Math.ceil(-Math.log(1 - confidence) / Math.log(2)); - sketch = new CountMinSketch(depth, width, SEED); + static <T> Sketch<T> create(int depth, int width, CountMinSketch sketch) { + return new AutoValue_SketchFrequencies_Sketch<T>( + depth, + width, + sketch); } - private Sketch(int width, int depth, CountMinSketch sketch) { - this.sketch = sketch; - this.width = width; - this.depth = depth; + static <T> Sketch<T> create(CountMinSketch sketch) { + int width = (int) Math.ceil(2 / sketch.getRelativeError()); + int depth = (int) Math.ceil(-Math.log(1 - sketch.getConfidence()) / Math.log(2)); + return new AutoValue_SketchFrequencies_Sketch<T>( + depth, + width, + sketch); } + abstract int depth(); + abstract int width(); + abstract CountMinSketch sketch(); + public void add(T element, long count, Coder<T> coder) { - sketch.add(hashElement(element, coder), count); + sketch().add(hashElement(element, coder), count); } public void add(T element, Coder<T> coder) { @@ -458,41 +475,14 @@ public final class SketchFrequencies { } } - public int getWidth() { - return this.width; - } - - public int getDepth() { - return this.depth; - } - /** * Utility class to retrieve the estimate frequency of an element from a {@link * CountMinSketch}. */ public long estimateCount(T element, Coder<T> coder) { - return sketch.estimateCount(hashElement(element, coder)); + return sketch().estimateCount(hashElement(element, coder)); } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - final Sketch<T> other = (Sketch<T>) o; - - if (depth != other.depth) { - return false; - } - if (width != other.width) { - return false; - } - return sketch.equals(other.sketch); - } } /** @@ -509,9 +499,9 @@ public final class SketchFrequencies { if (value == null) { throw new CoderException("cannot encode a null Count-min Sketch"); } - INT_CODER.encode(value.width, outStream); - INT_CODER.encode(value.depth, outStream); - BYTE_ARRAY_CODER.encode(CountMinSketch.serialize(value.sketch), outStream); + INT_CODER.encode(value.width(), outStream); + INT_CODER.encode(value.depth(), outStream); + BYTE_ARRAY_CODER.encode(CountMinSketch.serialize(value.sketch()), outStream); } @Override @@ -520,7 +510,7 @@ public final class SketchFrequencies { int depth = INT_CODER.decode(inStream); byte[] sketchBytes = BYTE_ARRAY_CODER.decode(inStream); CountMinSketch sketch = CountMinSketch.deserialize(sketchBytes); - return new Sketch<T>(width, depth, sketch); + return Sketch.<T>create(depth, width, sketch); } @Override @@ -537,7 +527,7 @@ public final class SketchFrequencies { // 4L * 4 is for depth and width (ints) in Sketch<T> and in the Count-Min sketch // 8L * depth * (width + 1) is a factorization for the sizes of table (long[depth][width]) // and hashA (long[depth]) - return 8L + 4L * 4 + 8L * value.getDepth() * (value.getWidth() + 1); + return 8L + 4L * 4 + 8L * value.depth() * (value.width() + 1); } } } diff --git a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java index ea773e6..34d9ed1 100644 --- a/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java +++ b/sdks/java/extensions/sketching/src/test/java/org/apache/beam/sdk/extensions/sketching/SketchFrequenciesTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.extensions.sketching; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; import java.io.Serializable; import java.util.ArrayList; @@ -43,7 +44,6 @@ import org.apache.beam.sdk.transforms.Values; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -115,7 +115,7 @@ public class SketchFrequenciesTest implements Serializable { // n sketches each containing [0, 1, 2] for (int i = 0; i < nOccurrences; i++) { - Sketch<Integer> sketch = new Sketch<Integer>(eps, conf); + Sketch<Integer> sketch = Sketch.<Integer>create(eps, conf); for (int j = 0; j < size; j++) { sketch.add(j, coder); } @@ -125,7 +125,7 @@ public class SketchFrequenciesTest implements Serializable { CountMinSketchFn<Integer> fn = CountMinSketchFn.create(coder).withAccuracy(eps, conf); Sketch<Integer> merged = fn.mergeAccumulators(sketches); for (int i = 0; i < size; i++) { - Assert.assertEquals(nOccurrences, merged.estimateCount(i, coder)); + assertEquals(nOccurrences, merged.estimateCount(i, coder)); } } @@ -135,7 +135,7 @@ public class SketchFrequenciesTest implements Serializable { long occurrences = 2L; // occurrence of each user in the stream double eps = 0.01; double conf = 0.8; - Sketch<GenericRecord> sketch = new Sketch<>(eps, conf); + Sketch<GenericRecord> sketch = Sketch.<GenericRecord>create(eps, conf); Schema schema = SchemaBuilder.record("User") .fields() @@ -149,13 +149,13 @@ public class SketchFrequenciesTest implements Serializable { newRecord.put("Pseudo", "User" + i); newRecord.put("Age", i); sketch.add(newRecord, occurrences, coder); - Assert.assertEquals("Test API", occurrences, sketch.estimateCount(newRecord, coder)); + assertEquals("Test API", occurrences, sketch.estimateCount(newRecord, coder)); } } @Test public void testCoder() throws Exception { - Sketch<Integer> cMSketch = new Sketch<Integer>(0.01, 0.8); + Sketch<Integer> cMSketch = Sketch.<Integer>create(0.01, 0.8); Coder<Integer> coder = VarIntCoder.of(); for (int i = 0; i < 3; i++) { cMSketch.add(i, coder); @@ -196,7 +196,7 @@ public class SketchFrequenciesTest implements Serializable { @Override public Void apply(Sketch<T> sketch) { for (int i = 0; i < elements.length; i++) { - Assert.assertEquals((long) expectedHits[i], sketch.estimateCount(elements[i], coder)); + assertEquals((long) expectedHits[i], sketch.estimateCount(elements[i], coder)); } return null; } -- To stop receiving notification emails like this one, please contact lc...@apache.org.