This is an automated email from the ASF dual-hosted git repository. scott pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new b34b180 [BEAM-5427] Fix and update sample code for CombineFn. (#6439) b34b180 is described below commit b34b18079efb699718bb2503757a6da8ef1f2433 Author: Ruoyun Huang <huan...@gmail.com> AuthorDate: Wed Oct 3 15:14:23 2018 -0700 [BEAM-5427] Fix and update sample code for CombineFn. (#6439) --- .../org/apache/beam/sdk/transforms/Combine.java | 36 +++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 2c65f94..20314fc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -254,10 +254,26 @@ public class Combine { * * <pre>{@code * public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> { - * public static class Accum { + * public static class Accum implements Serializable { * int sum = 0; * int count = 0; + * + * {@literal@}Override + * public boolean equals(Object other) { + * if (other == null) return false; + * if (other == this) return true; + * if (!(other instanceof Accum))return false; + * + * + * Accum o = (Accum)other; + * if (this.sum != o.sum || this.count != o.count) { + * return false; + * } else { + * return true; + * } + * } * } + * * public Accum createAccumulator() { * return new Accum(); * } @@ -289,6 +305,24 @@ public class Combine { * arbitrary tree structure. Commutativity is required because any order of the input values is * ignored when breaking up input values into groups. * + * <h3>Note on Data Encoding</h3> + * + * <p>Some form of data encoding is required when using custom types in a CombineFn which do not + * have well-known coders. The sample code above uses a custom Accumulator which gets coder by + * implementing {@link java.io.Serializable}. By doing this, we are relying on the generic {@link + * org.apache.beam.sdk.coders.CoderProvider}, which is able to provide a coder for any {@link + * java.io.Serializable} if applicable. In cases where {@link java.io.Serializable} is not + * efficient, or inapplicable, in general there are two alternatives for encoding: + * + * <ul> + * <li>Default {@link org.apache.beam.sdk.coders.CoderRegistry}. For example, implement a coder + * class explicitly and use the {@code @DefaultCoder} tag. See the {@link + * org.apache.beam.sdk.coders.CoderRegistry} for the numerous ways in which to bind a type + * to a coder. + * <li>CombineFn specific way. While extending CombineFn, overwrite both {@link + * #getAccumulatorCoder} and {@link #getDefaultOutputCoder}. + * </ul> + * * @param <InputT> type of input values * @param <AccumT> type of mutable accumulator values * @param <OutputT> type of output values