tilgalas commented on code in PR #31379: URL: https://github.com/apache/beam/pull/31379#discussion_r1638323777
########## sdks/java/extensions/histogram/src/main/java/org/apache/beam/sdk/extensions/histogram/Histogram.java: ########## @@ -0,0 +1,571 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.histogram; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.VarInt; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs; +import org.apache.commons.lang3.ArrayUtils; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +/** + * A histogram transform with a combiner that efficiently constructs linear, exponential or explicit + * histograms from large datasets of input data. Bucket bounds can be specified using the {@link + * BucketBounds} class. + */ +public class Histogram { + + private Histogram() { + // do not instantiate + } + + /** + * Returns a {@code PTransform} that takes a {@code PCollection<T>} and returns a {@code + * PCollection<List<Long>>} with a single element per window. The values of this list represent + * the number of elements within each bucket of a histogram, as defined by {@link BucketBounds}. + * The first and the last elements of the list are numbers of elements in underflow and overflow + * buckets. + * + * <p>Example of use: + * + * <pre>{@code + * PCollection<Double> pc = ...; + * PCollection<List<Long>> bucketCounts = + * pc.apply(Histogram.globally(BucketBounds.linear(1.0, 2.0, 100))); + * + * }</pre> + * + * @param <T> the type of the elements in the input {@code PCollection} + * @param bucketBounds the instance of the {@link BucketBounds} class with desired parameters of + * the histogram. + */ + public static <T extends Number> Combine.Globally<T, List<Long>> globally( + BucketBounds bucketBounds) { + return Combine.globally(HistogramCombineFn.create(bucketBounds)); + } + + /** + * Returns a {@code PTransform} that takes a {@code PCollection<KV<K, V>>} and returns a {@code + * PCollection<KV<K, List<Long>>>} that contains an output element mapping each distinct key in + * the input {@code PCollection} to a {@code List}. The values of this list represent the number + * of elements within each bucket of a histogram, as defined by {@link BucketBounds}. The first + * and the last elements of the list are numbers of elements in underflow and overflow buckets. + * + * <p>Example of use: + * + * <pre>{@code + * PCollection<KV<String, Integer>> pc = ...; + * PCollection<KV<String, List<Long>>> bucketCounts = + * pc.apply(Histogram.perKey(BucketBounds.linear(1.0, 2.0, 100))); + * + * }</pre> + * + * @param <K> the type of the keys in the input and output {@code PCollection}s + * @param <V> the type of the values in the input {@code PCollection} + * @param bucketBounds the instance of the {@link BucketBounds} class with desired parameters of + * the histogram. + */ + public static <K, V extends Number> Combine.PerKey<K, V, List<Long>> perKey( + BucketBounds bucketBounds) { + return Combine.perKey(HistogramCombineFn.create(bucketBounds)); + } + + /** + * Defines the bounds for histogram buckets. + * + * <p>Use the provided static factory methods to create new instances of {@link BucketBounds}. + */ + @AutoValue + public abstract static class BucketBounds { + + // Package-private because users should use static factory methods to instantiate new instances. + BucketBounds() {} + + public abstract List<Double> getBounds(); + + public abstract BoundsInclusivity getBoundsInclusivity(); + + /** + * Static factory method for defining bounds of exponential histograms and calculating bounds + * based on the parameters. + * + * <p>For BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE, the list that the + * HistogramCombineFn combiner returns contains the number of elements in the following buckets: + * + * <pre> + * 0-th: (-inf, scale) - underflow bucket + * 1-st: [scale, scale * growthFactor) + * 2-nd: [scale * growthFactor, scale * growthFactor^2) + * ... + * i-th: [scale * growthFactor^(i-1), scale * growthFactor^i) + * ... + * numBoundedBuckets: [scale * growthFactor^(numBoundedBuckets-1), scale * + * growthFactor^numBoundedBuckets) + * numBoundedBuckets + 1: [scale * growthFactor^numBoundedBuckets), +inf) - overflow bucket. + * </pre> + * + * <p>For BoundsInclusivity.LOWER_BOUND_EXCLUSIVE_UPPER_BOUND_INCLUSIVE, the list that the + * HistogramCombineFn combiner returns contains the number of elements in the following buckets: + * + * <pre> + * 0-th: (-inf, scale] - underflow bucket + * 1-st: (scale, scale * growthFactor] + * 2-nd: (scale * growthFactor, scale * growthFactor^2] + * ... + * i-th: (scale * growthFactor^(i-1), scale * growthFactor^i] + * ... + * numBoundedBuckets: (scale * growthFactor^(numBoundedBuckets-1), scale * + * growthFactor^numBoundedBuckets] + * numBoundedBuckets + 1: (scale * growthFactor^numBoundedBuckets), +inf) - overflow bucket. + * </pre> + * + * @param scale the value of the lower bound for the first bounded bucket. + * @param growthFactor value by which the bucket bounds are exponentially increased. + * @param numBoundedBuckets integer determining the total number of bounded buckets within the + * histogram. + * @param boundsInclusivity enum value which defines if lower or upper bounds are + * inclusive/exclusive. + */ + public static BucketBounds exponential( + double scale, + double growthFactor, + int numBoundedBuckets, + BoundsInclusivity boundsInclusivity) { + checkArgument(scale > 0.0, "scale should be positive."); + checkArgument(growthFactor > 1.0, "growth factor should be greater than 1.0."); + checkArgument( + numBoundedBuckets > 0, "number of bounded buckets should be greater than zero."); + checkArgument( + numBoundedBuckets <= Integer.MAX_VALUE - 2, + "number of bounded buckets should be less than max value of integer."); + + ImmutableList.Builder<Double> boundsCalculated = new ImmutableList.Builder<>(); + // The number of bounds is equal to the numBoundedBuckets + 1. + for (int i = 0; i <= numBoundedBuckets; i++) { + double bound = scale * Math.pow(growthFactor, i); + if (Double.isInfinite(bound)) { + throw new IllegalArgumentException("the bound has overflown double type."); + } + boundsCalculated.add(bound); + } + + return new AutoValue_Histogram_BucketBounds(boundsCalculated.build(), boundsInclusivity); + } + + /** + * Like {@link #exponential(double, double, int, BoundsInclusivity)}, but sets + * BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE value for the boundsInclusivity + * parameter. + */ + public static BucketBounds exponential( + double scale, double growthFactor, int numBoundedBuckets) { + return exponential( + scale, + growthFactor, + numBoundedBuckets, + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE); + } + + /** + * Static factory method for defining bounds of linear histogram and calculating bounds based on + * the parameters. + * + * @param offset value of the lower bound for the first bounded bucket. + * @param width bucket width. + * @param numBoundedBuckets integer determining the total number of bounded buckets within the + * histogram. + * @param boundsInclusivity enum value which defines if lower or upper bounds are + * inclusive/exclusive. + */ + public static BucketBounds linear( + double offset, double width, int numBoundedBuckets, BoundsInclusivity boundsInclusivity) { + checkArgument(width > 0.0, "width of buckets should be positive."); + checkArgument(numBoundedBuckets > 0, "number of bounded buckets should be more than zero."); + checkArgument( + numBoundedBuckets <= Integer.MAX_VALUE - 2, + "number of bounded buckets should be less than max value of integer."); + + ImmutableList.Builder<Double> boundsCalculated = new ImmutableList.Builder<>(); + // The number of bounds is equal to the numBoundedBuckets + 1. + for (int i = 0; i <= numBoundedBuckets; i++) { + double bound = offset + i * width; + if (Double.isInfinite(bound)) { + throw new IllegalArgumentException("the bound has overflown double type."); + } + boundsCalculated.add(bound); + } + + return new AutoValue_Histogram_BucketBounds(boundsCalculated.build(), boundsInclusivity); + } + + /** + * Like {@link #linear(double, double, int, BoundsInclusivity)}, but sets + * BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE value for the boundsInclusivity + * parameter. + */ + public static BucketBounds linear(double offset, double width, int numBoundedBuckets) { + return linear( + offset, + width, + numBoundedBuckets, + BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE); + } + + /** + * Static factory method for defining bounds of explicit histogram. + * + * @param bounds array of explicit bounds of the buckets. + * @param boundsInclusivity enum value which defines if lower or upper bounds are + * inclusive/exclusive. + */ + public static BucketBounds explicit(List<Double> bounds, BoundsInclusivity boundsInclusivity) { + checkNotNull(bounds, "the bounds array should not be null."); + checkArgument(bounds.size() > 0, "the bounds array should not be empty."); + + double prev = bounds.get(0); + for (int i = 1; i < bounds.size(); i++) { + if (prev >= bounds.get(i)) { + throw new IllegalArgumentException( + "bounds should be in ascending order without duplicates."); + } + prev = bounds.get(i); + } + + return new AutoValue_Histogram_BucketBounds(ImmutableList.copyOf(bounds), boundsInclusivity); + } + + /** + * Like {@link #explicit(List, BoundsInclusivity)}, but sets + * BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE value for the boundsInclusivity + * parameter. + */ + public static BucketBounds explicit(List<Double> bounds) { + return explicit(bounds, BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE); + } + } + + /** + * Combiner for calculating histograms. + * + * <p>The HistogramCombineFn class can be used with GroupBy transform to aggregate the input + * values in the KV pair. + * + * <p>Example of use: + * + * <pre>{@code + * PCollection<ParsedMessage> pc = ...; + * PCollection<Row> rows = + * pc.apply(Group.byFieldNames("dimension1", "dimension2").aggregateField("value", + * HistogramCombineFn.<Double>create(BucketBounds.linear(1.0, 2.0, 160)), + * Field.of("bucketCounts", FieldType.array(FieldType.INT64)))); + * + * }</pre> + */ + public static final class HistogramCombineFn<T> + extends Combine.CombineFn<T, HistogramAccumulator, List<Long>> { + + private final double[] bounds; + private final BoundsInclusivity boundsInclusivity; + + private HistogramCombineFn(double[] bounds, BoundsInclusivity boundsInclusivity) { + this.bounds = bounds; + this.boundsInclusivity = boundsInclusivity; + } + + /** + * Returns a histogram combiner with the given {@link BucketBounds}. + * + * @param bucketBounds the instance of the {@link BucketBounds} class with desired parameters of + * the histogram. + */ + public static <T extends Number> HistogramCombineFn<T> create(BucketBounds bucketBounds) { + return new HistogramCombineFn<>( + ArrayUtils.toPrimitive(bucketBounds.getBounds().toArray(new Double[0])), + bucketBounds.getBoundsInclusivity()); + } + + @Override + public HistogramAccumulator createAccumulator() { + return new HistogramAccumulator(bounds.length + 1); + } + + @Override + public HistogramAccumulator addInput(HistogramAccumulator accumulator, T input) + throws IllegalArgumentException { + if (input == null) { + throw new NullPointerException("input should not be null."); + } + + Double doubleValue = ((Number) input).doubleValue(); + if (doubleValue.isNaN() || doubleValue.isInfinite()) { + throw new IllegalArgumentException("input should not be NaN or infinite."); + } + int index = Arrays.binarySearch(bounds, doubleValue); + if (index < 0) { + accumulator.counts[-index - 1]++; + } else { + // This means the value is on bound, can be handled based on the bound inclusivity. + if (boundsInclusivity == BoundsInclusivity.LOWER_BOUND_INCLUSIVE_UPPER_BOUND_EXCLUSIVE) { + accumulator.counts[index + 1]++; + } else { + accumulator.counts[index]++; + } + } + return accumulator; + } + + @Override + public HistogramAccumulator mergeAccumulators( + @UnknownKeyFor @NonNull @Initialized Iterable<HistogramAccumulator> accumulators) { + HistogramAccumulator merged = createAccumulator(); + + for (HistogramAccumulator histogramAccumulator : accumulators) { + checkArgument( + merged.counts.length == histogramAccumulator.counts.length, + "number of buckets in the merging accumulators should be the same."); + for (int i = 0; i < histogramAccumulator.counts.length; ++i) { + merged.counts[i] += histogramAccumulator.counts[i]; + } + } + return merged; + } + + @Override + public List<Long> extractOutput(HistogramAccumulator accumulator) throws NullPointerException { + checkNotNull(accumulator, "can not output from null histogram."); + return Longs.asList(accumulator.counts); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized Coder<HistogramAccumulator> getAccumulatorCoder( + @UnknownKeyFor @NonNull @Initialized CoderRegistry registry, + @UnknownKeyFor @NonNull @Initialized Coder<T> inputCoder) { + return new HistogramAccumulatorCoder(); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized Coder<List<Long>> getDefaultOutputCoder( + @UnknownKeyFor @NonNull @Initialized CoderRegistry registry, + @UnknownKeyFor @NonNull @Initialized Coder<T> inputCoder) { + return ListCoder.of(VarLongCoder.of()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("numBuckets", bounds.length + 1).withLabel("Number of buckets")); + } + } + + /** Accumulator of the Histogram combiner. */ + static final class HistogramAccumulator { + + private long[] counts; + + public HistogramAccumulator(int numBuckets) { + checkArgument( + numBuckets > 2, + "number of buckets should be greater than two - underflow bucket and overflow bucket."); + this.counts = new long[numBuckets]; + } + + @Override + public boolean equals(@Nullable Object object) { + if (object instanceof HistogramAccumulator) { + HistogramAccumulator other = (HistogramAccumulator) object; + return Arrays.equals(counts, other.counts); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(counts)); + } + } + + /** Coder for {@link HistogramAccumulator}. */ + static final class HistogramAccumulatorCoder extends CustomCoder<HistogramAccumulator> { + + private static final VarLongCoder LONG_CODER = VarLongCoder.of(); + private static final VarIntCoder INT_CODER = VarIntCoder.of(); + + /** + * Index to indicate method used where only non-empty buckets are encoded with their indices. + */ + private static final int NON_EMPTY_BUCKETS_CODER = 0; Review Comment: we can still encode the enum as an integer, my suggestion was mostly "cosmetic" so that the code is more clear and the constants have some meaning attached to them by being part of a custom named type (an enum) instead of being just an int -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org