Repository: incubator-beam Updated Branches: refs/heads/master 70e6a1310 -> 861562239
Add CounterNameAndMetadata to support structured counter name Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bd40bff4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bd40bff4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bd40bff4 Branch: refs/heads/master Commit: bd40bff4e4696a76adac08e89275862e119ec141 Parents: 70e6a13 Author: Pei He <pe...@google.com> Authored: Wed Mar 30 22:38:34 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Tue Apr 19 08:24:33 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/CounterAggregator.java | 2 +- .../apache/beam/sdk/util/common/Counter.java | 84 +++++++--- .../beam/sdk/util/common/CounterName.java | 153 +++++++++++++++++++ .../apache/beam/sdk/util/common/CounterSet.java | 11 +- .../sdk/util/common/worker/StateSampler.java | 2 +- 5 files changed, 220 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java index de0c251..5fd04f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java @@ -87,7 +87,7 @@ public class CounterAggregator<InputT, AccumT, OutputT> implements Aggregator<In @Override public String getName() { - return counter.getName(); + return counter.getFlatName(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index 6cdacc5..6024576 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -107,11 +107,21 @@ public abstract class Counter<T> { * @return the newly constructed Counter * @throws IllegalArgumentException if the aggregation kind is not supported */ - public static Counter<Integer> ints(String name, AggregationKind kind) { + public static Counter<Integer> ints(CounterName name, AggregationKind kind) { return new IntegerCounter(name, kind); } /** + * Constructs a new {@code Counter<Integer>} with an unstructured name. + * + * @deprecated use {@link #ints(CounterName, AggregationKind)}. + */ + @Deprecated + public static Counter<Integer> ints(String name, AggregationKind kind) { + return new IntegerCounter(CounterName.named(name), kind); + } + + /** * Constructs a new {@link Counter} that aggregates {@link Long} values * according to the desired aggregation kind. The supported aggregation kinds * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN}, @@ -122,11 +132,21 @@ public abstract class Counter<T> { * @return the newly constructed Counter * @throws IllegalArgumentException if the aggregation kind is not supported */ - public static Counter<Long> longs(String name, AggregationKind kind) { + public static Counter<Long> longs(CounterName name, AggregationKind kind) { return new LongCounter(name, kind); } /** + * Constructs a new {@code Counter<Long>} with an unstructured name. + * + * @deprecated use {@link #longs(CounterName, AggregationKind)}. + */ + @Deprecated + public static Counter<Long> longs(String name, AggregationKind kind) { + return new LongCounter(CounterName.named(name), kind); + } + + /** * Constructs a new {@link Counter} that aggregates {@link Double} values * according to the desired aggregation kind. The supported aggregation kinds * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN}, @@ -137,11 +157,21 @@ public abstract class Counter<T> { * @return the newly constructed Counter * @throws IllegalArgumentException if the aggregation kind is not supported */ - public static Counter<Double> doubles(String name, AggregationKind kind) { + public static Counter<Double> doubles(CounterName name, AggregationKind kind) { return new DoubleCounter(name, kind); } /** + * Constructs a new {@code Counter<Double>} with an unstructured name. + * + * @deprecated use {@link #doubles(CounterName, AggregationKind)}. + */ + @Deprecated + public static Counter<Double> doubles(String name, AggregationKind kind) { + return new DoubleCounter(CounterName.named(name), kind); + } + + /** * Constructs a new {@link Counter} that aggregates {@link Boolean} values * according to the desired aggregation kind. The only supported aggregation * kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}. @@ -151,26 +181,20 @@ public abstract class Counter<T> { * @return the newly constructed Counter * @throws IllegalArgumentException if the aggregation kind is not supported */ - public static Counter<Boolean> booleans(String name, AggregationKind kind) { + public static Counter<Boolean> booleans(CounterName name, AggregationKind kind) { return new BooleanCounter(name, kind); } /** - * Constructs a new {@link Counter} that aggregates {@link String} values - * according to the desired aggregation kind. The only supported aggregation - * kind is {@link AggregationKind#MIN} and {@link AggregationKind#MAX}. + * Constructs a new {@code Counter<Boolean>} with an unstructured name. * - * @param name the name of the new counter - * @param kind the new counter's aggregation kind - * @return the newly constructed Counter - * @throws IllegalArgumentException if the aggregation kind is not supported + * @deprecated use {@link #booleans(CounterName, AggregationKind)}. */ - @SuppressWarnings("unused") - private static Counter<String> strings(String name, AggregationKind kind) { - return new StringCounter(name, kind); + @Deprecated + public static Counter<Boolean> booleans(String name, AggregationKind kind) { + return new BooleanCounter(CounterName.named(name), kind); } - ////////////////////////////////////////////////////////////////////////////// /** @@ -209,10 +233,20 @@ public abstract class Counter<T> { public abstract CounterMean<T> getAndResetMeanDelta(); /** + * Returns the counter's flat name. + */ + public String getFlatName() { + return name.getFlatName(); + } + + /** * Returns the counter's name. + * + * @deprecated use {@link #getFlatName}. */ + @Deprecated public String getName() { - return name; + return name.getFlatName(); } /** @@ -267,7 +301,7 @@ public abstract class Counter<T> { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(getName()); + sb.append(getFlatName()); sb.append(":"); sb.append(getKind()); sb.append("("); @@ -342,13 +376,13 @@ public abstract class Counter<T> { ////////////////////////////////////////////////////////////////////////////// - /** The name of this counter. */ - protected final String name; + /** The name and metadata of this counter. */ + protected final CounterName name; /** The kind of aggregation function to apply to this counter. */ protected final AggregationKind kind; - protected Counter(String name, AggregationKind kind) { + protected Counter(CounterName name, AggregationKind kind) { this.name = name; this.kind = kind; } @@ -365,7 +399,7 @@ public abstract class Counter<T> { private final AtomicReference<LongCounterMean> deltaMean; /** Initializes a new {@link Counter} for {@link Long} values. */ - private LongCounter(String name, AggregationKind kind) { + private LongCounter(CounterName name, AggregationKind kind) { super(name, kind); switch (kind) { case MEAN: @@ -560,7 +594,7 @@ public abstract class Counter<T> { AtomicReference<DoubleCounterMean> deltaMean; /** Initializes a new {@link Counter} for {@link Double} values. */ - private DoubleCounter(String name, AggregationKind kind) { + private DoubleCounter(CounterName name, AggregationKind kind) { super(name, kind); switch (kind) { case MEAN: @@ -753,7 +787,7 @@ public abstract class Counter<T> { private final AtomicBoolean deltaAggregate; /** Initializes a new {@link Counter} for {@link Boolean} values. */ - private BooleanCounter(String name, AggregationKind kind) { + private BooleanCounter(CounterName name, AggregationKind kind) { super(name, kind); aggregate = new AtomicBoolean(); deltaAggregate = new AtomicBoolean(); @@ -825,7 +859,7 @@ public abstract class Counter<T> { */ private static class StringCounter extends Counter<String> { /** Initializes a new {@link Counter} for {@link String} values. */ - private StringCounter(String name, AggregationKind kind) { + private StringCounter(CounterName name, AggregationKind kind) { super(name, kind); // TODO: Support MIN, MAX of Strings. throw illegalArgumentException(); @@ -908,7 +942,7 @@ public abstract class Counter<T> { private final AtomicReference<IntegerCounterMean> deltaMean; /** Initializes a new {@link Counter} for {@link Integer} values. */ - private IntegerCounter(String name, AggregationKind kind) { + private IntegerCounter(CounterName name, AggregationKind kind) { super(name, kind); switch (kind) { case MEAN: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java new file mode 100644 index 0000000..b46be98 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.common; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.Strings; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * The name of a counter identifies the user-specified name, as well as the origin, + * the step the counter is associated with, and a prefix to add to the name. + * + * <p>For backwards compatibility, the {@link CounterName} will be converted to + * a flat name (string) during the migration. + */ +public class CounterName { + /** + * Returns a {@link CounterName} with the given name. + */ + public static CounterName named(String name) { + return new CounterName(name, "", "", ""); + } + + /** + * Returns a msecs {@link CounterName}. + */ + public static CounterName msecs(String name) { + return named(name + "-msecs"); + } + + /** + * Returns a {@link CounterName} identical to this, but with the given origin. + */ + public CounterName withOrigin(String origin) { + return new CounterName(this.name, origin, this.stepName, this.prefix); + } + + /** + * Returns a {@link CounterName} identical to this, but with the given step name. + */ + public CounterName withStepName(String stepName) { + return new CounterName(this.name, this.origin, stepName, this.prefix); + } + + /** + * Returns a {@link CounterName} identical to this, but with the given prefix. + */ + public CounterName withPrefix(String prefix) { + return new CounterName(this.name, this.origin, this.stepName, prefix); + } + + /** + * Name of the counter. + * + * <p>For example, process-msecs, ElementCount. + */ + private final String name; + + /** + * Origin (namespace) of counter name. + * + * <p>For example, "user" for user-defined counters. + * It is empty for counters defined by the SDK or the runner. + */ + private final String origin; + + /** + * System defined step name or the named-output of a step. + * + * <p>For example, {@code s1} or {@code s2.out}. + * It may be empty when counters don't associate with step names. + */ + private final String stepName; + + /** + * Prefix of group of counters. + * + * <p>It is empty when counters don't have general prefixes. + */ + private final String prefix; + + /** + * Flat name is the equivalent unstructured name. + * + * <p>It is null before {@link #getFlatName()} is called. + */ + private AtomicReference<String> flatName; + + private CounterName(String name, String origin, String stepName, String prefix) { + this.name = checkNotNull(name, "name"); + this.origin = checkNotNull(origin, "origin"); + this.stepName = checkNotNull(stepName, "stepName"); + this.prefix = checkNotNull(prefix, "prefix"); + this.flatName = new AtomicReference<String>(); + } + + /** + * Returns the flat name of a structured counter. + */ + public String getFlatName() { + String ret = flatName.get(); + if (ret == null) { + StringBuilder sb = new StringBuilder(); + if (!Strings.isNullOrEmpty(prefix)) { + // Not all runner versions use "-" to concatenate prefix, it may already have it in it. + sb.append(prefix); + } + if (!Strings.isNullOrEmpty(origin)) { + sb.append(origin + "-"); + } + if (!Strings.isNullOrEmpty(stepName)) { + sb.append(stepName + "-"); + } + sb.append(name); + flatName.compareAndSet(null, sb.toString()); + ret = flatName.get(); + } + return ret; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } else if (o instanceof CounterName) { + CounterName that = (CounterName) o; + return this.getFlatName().equals(that.getFlatName()); + } + return false; + } + + @Override + public int hashCode() { + return getFlatName().hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java index 699d7d3..cb0ffe5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java @@ -74,10 +74,11 @@ public class CounterSet extends AbstractSet<Counter<?>> { * name but an incompatible kind had already been added */ public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) { - Counter<?> oldCounter = counters.get(counter.getName()); + String flatName = counter.getFlatName(); + Counter<?> oldCounter = counters.get(flatName); if (oldCounter == null) { // A new counter. - counters.put(counter.getName(), counter); + counters.put(flatName, counter); return counter; } if (counter.isCompatibleWith(oldCounter)) { @@ -125,16 +126,16 @@ public class CounterSet extends AbstractSet<Counter<?>> { if (null == e) { return false; } - if (counters.containsKey(e.getName())) { + if (counters.containsKey(e.getFlatName())) { return false; } - counters.put(e.getName(), e); + counters.put(e.getFlatName(), e); return true; } public synchronized void merge(CounterSet that) { for (Counter<?> theirCounter : that) { - Counter<?> myCounter = counters.get(theirCounter.getName()); + Counter<?> myCounter = counters.get(theirCounter.getFlatName()); if (myCounter != null) { mergeCounters(myCounter, theirCounter); } else { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java index bc16bdd..ee95260 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java @@ -262,7 +262,7 @@ public class StateSampler implements AutoCloseable { */ public synchronized StateSamplerInfo getInfo() { return currentState == DO_NOT_SAMPLE ? null - : new StateSamplerInfo(countersByState.get(currentState).getName(), + : new StateSamplerInfo(countersByState.get(currentState).getFlatName(), stateTransitionCount, null); }