Repository: incubator-beam
Updated Branches:
  refs/heads/master 70e6a1310 -> 861562239


Add CounterNameAndMetadata to support structured counter name


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bd40bff4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bd40bff4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bd40bff4

Branch: refs/heads/master
Commit: bd40bff4e4696a76adac08e89275862e119ec141
Parents: 70e6a13
Author: Pei He <pe...@google.com>
Authored: Wed Mar 30 22:38:34 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Tue Apr 19 08:24:33 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/CounterAggregator.java |   2 +-
 .../apache/beam/sdk/util/common/Counter.java    |  84 +++++++---
 .../beam/sdk/util/common/CounterName.java       | 153 +++++++++++++++++++
 .../apache/beam/sdk/util/common/CounterSet.java |  11 +-
 .../sdk/util/common/worker/StateSampler.java    |   2 +-
 5 files changed, 220 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
index de0c251..5fd04f5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CounterAggregator.java
@@ -87,7 +87,7 @@ public class CounterAggregator<InputT, AccumT, OutputT> 
implements Aggregator<In
 
   @Override
   public String getName() {
-    return counter.getName();
+    return counter.getFlatName();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
index 6cdacc5..6024576 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
@@ -107,11 +107,21 @@ public abstract class Counter<T> {
    * @return the newly constructed Counter
    * @throws IllegalArgumentException if the aggregation kind is not supported
    */
-  public static Counter<Integer> ints(String name, AggregationKind kind) {
+  public static Counter<Integer> ints(CounterName name, AggregationKind kind) {
     return new IntegerCounter(name, kind);
   }
 
   /**
+   * Constructs a new {@code Counter<Integer>} with an unstructured name.
+   *
+   * @deprecated use {@link #ints(CounterName, AggregationKind)}.
+   */
+  @Deprecated
+  public static Counter<Integer> ints(String name, AggregationKind kind) {
+    return new IntegerCounter(CounterName.named(name), kind);
+  }
+
+  /**
    * Constructs a new {@link Counter} that aggregates {@link Long} values
    * according to the desired aggregation kind. The supported aggregation kinds
    * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
@@ -122,11 +132,21 @@ public abstract class Counter<T> {
    * @return the newly constructed Counter
    * @throws IllegalArgumentException if the aggregation kind is not supported
    */
-  public static Counter<Long> longs(String name, AggregationKind kind) {
+  public static Counter<Long> longs(CounterName name, AggregationKind kind) {
     return new LongCounter(name, kind);
   }
 
   /**
+   * Constructs a new {@code Counter<Long>} with an unstructured name.
+   *
+   * @deprecated use {@link #longs(CounterName, AggregationKind)}.
+   */
+  @Deprecated
+  public static Counter<Long> longs(String name, AggregationKind kind) {
+    return new LongCounter(CounterName.named(name), kind);
+  }
+
+  /**
    * Constructs a new {@link Counter} that aggregates {@link Double} values
    * according to the desired aggregation kind. The supported aggregation kinds
    * are {@link AggregationKind#SUM}, {@link AggregationKind#MIN},
@@ -137,11 +157,21 @@ public abstract class Counter<T> {
    * @return the newly constructed Counter
    * @throws IllegalArgumentException if the aggregation kind is not supported
    */
-  public static Counter<Double> doubles(String name, AggregationKind kind) {
+  public static Counter<Double> doubles(CounterName name, AggregationKind 
kind) {
     return new DoubleCounter(name, kind);
   }
 
   /**
+   * Constructs a new {@code Counter<Double>} with an unstructured name.
+   *
+   * @deprecated use {@link #doubles(CounterName, AggregationKind)}.
+   */
+  @Deprecated
+  public static Counter<Double> doubles(String name, AggregationKind kind) {
+    return new DoubleCounter(CounterName.named(name), kind);
+  }
+
+  /**
    * Constructs a new {@link Counter} that aggregates {@link Boolean} values
    * according to the desired aggregation kind. The only supported aggregation
    * kinds are {@link AggregationKind#AND} and {@link AggregationKind#OR}.
@@ -151,26 +181,20 @@ public abstract class Counter<T> {
    * @return the newly constructed Counter
    * @throws IllegalArgumentException if the aggregation kind is not supported
    */
-  public static Counter<Boolean> booleans(String name, AggregationKind kind) {
+  public static Counter<Boolean> booleans(CounterName name, AggregationKind 
kind) {
     return new BooleanCounter(name, kind);
   }
 
   /**
-   * Constructs a new {@link Counter} that aggregates {@link String} values
-   * according to the desired aggregation kind. The only supported aggregation
-   * kind is {@link AggregationKind#MIN} and {@link AggregationKind#MAX}.
+   * Constructs a new {@code Counter<Boolean>} with an unstructured name.
    *
-   * @param name the name of the new counter
-   * @param kind the new counter's aggregation kind
-   * @return the newly constructed Counter
-   * @throws IllegalArgumentException if the aggregation kind is not supported
+   * @deprecated use {@link #booleans(CounterName, AggregationKind)}.
    */
-  @SuppressWarnings("unused")
-  private static Counter<String> strings(String name, AggregationKind kind) {
-    return new StringCounter(name, kind);
+  @Deprecated
+  public static Counter<Boolean> booleans(String name, AggregationKind kind) {
+    return new BooleanCounter(CounterName.named(name), kind);
   }
 
-
   
//////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -209,10 +233,20 @@ public abstract class Counter<T> {
   public abstract CounterMean<T> getAndResetMeanDelta();
 
   /**
+   * Returns the counter's flat name.
+   */
+  public String getFlatName() {
+    return name.getFlatName();
+  }
+
+  /**
    * Returns the counter's name.
+   *
+   * @deprecated use {@link #getFlatName}.
    */
+  @Deprecated
   public String getName() {
-    return name;
+    return name.getFlatName();
   }
 
   /**
@@ -267,7 +301,7 @@ public abstract class Counter<T> {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append(getName());
+    sb.append(getFlatName());
     sb.append(":");
     sb.append(getKind());
     sb.append("(");
@@ -342,13 +376,13 @@ public abstract class Counter<T> {
 
   
//////////////////////////////////////////////////////////////////////////////
 
-  /** The name of this counter. */
-  protected final String name;
+  /** The name and metadata of this counter. */
+  protected final CounterName name;
 
   /** The kind of aggregation function to apply to this counter. */
   protected final AggregationKind kind;
 
-  protected Counter(String name, AggregationKind kind) {
+  protected Counter(CounterName name, AggregationKind kind) {
     this.name = name;
     this.kind = kind;
   }
@@ -365,7 +399,7 @@ public abstract class Counter<T> {
     private final AtomicReference<LongCounterMean> deltaMean;
 
     /** Initializes a new {@link Counter} for {@link Long} values. */
-    private LongCounter(String name, AggregationKind kind) {
+    private LongCounter(CounterName name, AggregationKind kind) {
       super(name, kind);
       switch (kind) {
         case MEAN:
@@ -560,7 +594,7 @@ public abstract class Counter<T> {
     AtomicReference<DoubleCounterMean> deltaMean;
 
     /** Initializes a new {@link Counter} for {@link Double} values. */
-    private DoubleCounter(String name, AggregationKind kind) {
+    private DoubleCounter(CounterName name, AggregationKind kind) {
       super(name, kind);
       switch (kind) {
         case MEAN:
@@ -753,7 +787,7 @@ public abstract class Counter<T> {
     private final AtomicBoolean deltaAggregate;
 
     /** Initializes a new {@link Counter} for {@link Boolean} values. */
-    private BooleanCounter(String name, AggregationKind kind) {
+    private BooleanCounter(CounterName name, AggregationKind kind) {
       super(name, kind);
       aggregate = new AtomicBoolean();
       deltaAggregate = new AtomicBoolean();
@@ -825,7 +859,7 @@ public abstract class Counter<T> {
    */
   private static class StringCounter extends Counter<String> {
     /** Initializes a new {@link Counter} for {@link String} values. */
-    private StringCounter(String name, AggregationKind kind) {
+    private StringCounter(CounterName name, AggregationKind kind) {
       super(name, kind);
       // TODO: Support MIN, MAX of Strings.
       throw illegalArgumentException();
@@ -908,7 +942,7 @@ public abstract class Counter<T> {
     private final AtomicReference<IntegerCounterMean> deltaMean;
 
     /** Initializes a new {@link Counter} for {@link Integer} values. */
-    private IntegerCounter(String name, AggregationKind kind) {
+    private IntegerCounter(CounterName name, AggregationKind kind) {
       super(name, kind);
       switch (kind) {
         case MEAN:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
new file mode 100644
index 0000000..b46be98
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterName.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.common;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Strings;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The name of a counter identifies the user-specified name, as well as the 
origin,
+ * the step the counter is associated with, and a prefix to add to the name.
+ *
+ * <p>For backwards compatibility, the {@link CounterName} will be converted to
+ * a flat name (string) during the migration.
+ */
+public class CounterName {
+  /**
+   * Returns a {@link CounterName} with the given name.
+   */
+  public static CounterName named(String name) {
+    return new CounterName(name, "", "", "");
+  }
+
+  /**
+   * Returns a msecs {@link CounterName}.
+   */
+  public static CounterName msecs(String name) {
+    return named(name + "-msecs");
+  }
+
+  /**
+   * Returns a {@link CounterName} identical to this, but with the given 
origin.
+   */
+  public CounterName withOrigin(String origin) {
+    return new CounterName(this.name, origin, this.stepName, this.prefix);
+  }
+
+  /**
+   * Returns a {@link CounterName} identical to this, but with the given step 
name.
+   */
+  public CounterName withStepName(String stepName) {
+    return new CounterName(this.name, this.origin, stepName, this.prefix);
+  }
+
+  /**
+   * Returns a {@link CounterName} identical to this, but with the given 
prefix.
+   */
+  public CounterName withPrefix(String prefix) {
+    return new CounterName(this.name, this.origin, this.stepName, prefix);
+  }
+
+  /**
+   * Name of the counter.
+   *
+   * <p>For example, process-msecs, ElementCount.
+   */
+  private final String name;
+
+  /**
+   * Origin (namespace) of counter name.
+   *
+   * <p>For example, "user" for user-defined counters.
+   * It is empty for counters defined by the SDK or the runner.
+   */
+  private final String origin;
+
+  /**
+   * System defined step name or the named-output of a step.
+   *
+   * <p>For example, {@code s1} or {@code s2.out}.
+   * It may be empty when counters don't associate with step names.
+   */
+  private final String stepName;
+
+  /**
+   * Prefix of group of counters.
+   *
+   * <p>It is empty when counters don't have general prefixes.
+   */
+  private final String prefix;
+
+  /**
+   * Flat name is the equivalent unstructured name.
+   *
+   * <p>It is null before {@link #getFlatName()} is called.
+   */
+  private AtomicReference<String> flatName;
+
+  private CounterName(String name, String origin, String stepName, String 
prefix) {
+    this.name = checkNotNull(name, "name");
+    this.origin = checkNotNull(origin, "origin");
+    this.stepName = checkNotNull(stepName, "stepName");
+    this.prefix = checkNotNull(prefix, "prefix");
+    this.flatName = new AtomicReference<String>();
+  }
+
+  /**
+   * Returns the flat name of a structured counter.
+   */
+  public String getFlatName() {
+    String ret = flatName.get();
+    if (ret == null) {
+      StringBuilder sb = new StringBuilder();
+      if (!Strings.isNullOrEmpty(prefix)) {
+        // Not all runner versions use "-" to concatenate prefix, it may 
already have it in it.
+        sb.append(prefix);
+      }
+      if (!Strings.isNullOrEmpty(origin)) {
+        sb.append(origin + "-");
+      }
+      if (!Strings.isNullOrEmpty(stepName)) {
+        sb.append(stepName + "-");
+      }
+      sb.append(name);
+      flatName.compareAndSet(null, sb.toString());
+      ret = flatName.get();
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (o instanceof CounterName) {
+      CounterName that = (CounterName) o;
+      return this.getFlatName().equals(that.getFlatName());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return getFlatName().hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
index 699d7d3..cb0ffe5 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/CounterSet.java
@@ -74,10 +74,11 @@ public class CounterSet extends AbstractSet<Counter<?>> {
    * name but an incompatible kind had already been added
    */
   public synchronized <T> Counter<T> addOrReuseCounter(Counter<T> counter) {
-    Counter<?> oldCounter = counters.get(counter.getName());
+    String flatName = counter.getFlatName();
+    Counter<?> oldCounter = counters.get(flatName);
     if (oldCounter == null) {
       // A new counter.
-      counters.put(counter.getName(), counter);
+      counters.put(flatName, counter);
       return counter;
     }
     if (counter.isCompatibleWith(oldCounter)) {
@@ -125,16 +126,16 @@ public class CounterSet extends AbstractSet<Counter<?>> {
     if (null == e) {
       return false;
     }
-    if (counters.containsKey(e.getName())) {
+    if (counters.containsKey(e.getFlatName())) {
       return false;
     }
-    counters.put(e.getName(), e);
+    counters.put(e.getFlatName(), e);
     return true;
   }
 
   public synchronized void merge(CounterSet that) {
     for (Counter<?> theirCounter : that) {
-      Counter<?> myCounter = counters.get(theirCounter.getName());
+      Counter<?> myCounter = counters.get(theirCounter.getFlatName());
       if (myCounter != null) {
         mergeCounters(myCounter, theirCounter);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd40bff4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
index bc16bdd..ee95260 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/worker/StateSampler.java
@@ -262,7 +262,7 @@ public class StateSampler implements AutoCloseable {
    */
   public synchronized StateSamplerInfo getInfo() {
     return currentState == DO_NOT_SAMPLE ? null
-        : new StateSamplerInfo(countersByState.get(currentState).getName(),
+        : new StateSamplerInfo(countersByState.get(currentState).getFlatName(),
             stateTransitionCount, null);
   }
 

Reply via email to