Use a int Sequence instead of a Random UUID for Aggregator IDs Aggregator IDs are used to ensure that an Aggregator's identity is consistent across synchronization barriers. This is only relevant when constructing the map of Step -> Aggregator to enable querying, as the DoFns represented within the graph may be serialized. The identity has no impact on the interaction between the runner and aggregator, which is the responsibility of the ProcessContext object and setupDelegateAggregators.
UUID#randomUUID uses a shared SecureRandom to create the bytes of the UUID; SecureRandom#nextBytes is a synchronized method, so regardless of the underlying source of randomness, only one random UUID can be generated at a time. Instead, use an atomically increasing int to identify aggregators. This should be sufficient for user-created aggregators, and system aggregators should not care about the id. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd854b1a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd854b1a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd854b1a Branch: refs/heads/apex-runner Commit: dd854b1a71770b9b452361e0d92e018b65f1b3e8 Parents: f2ec824 Author: Thomas Groh <tg...@google.com> Authored: Thu Oct 27 10:19:03 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Thu Oct 27 16:28:32 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/DelegatingAggregator.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd854b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java index d92bb71..e03d3b1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java @@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.base.MoreObjects; import java.io.Serializable; import java.util.Objects; -import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.transforms.Combine.CombineFn; /** @@ -37,7 +37,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; */ class DelegatingAggregator<AggInputT, AggOutputT> implements Aggregator<AggInputT, AggOutputT>, Serializable { - private final UUID id; + private static final AtomicInteger ID_GEN = new AtomicInteger(); + private final int id; private final String name; @@ -47,7 +48,7 @@ class DelegatingAggregator<AggInputT, AggOutputT> public DelegatingAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) { - this.id = UUID.randomUUID(); + this.id = ID_GEN.getAndIncrement(); this.name = checkNotNull(name, "name cannot be null"); // Safe contravariant cast @SuppressWarnings("unchecked")