Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 bad377c67 -> f7e85e230


http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 9286ea9..096d147 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -118,7 +118,7 @@ public class Metrics {
     @Override public void inc(long n) {
       MetricsContainer container = MetricsEnvironment.getCurrentContainer();
       if (container != null) {
-        container.getCounter(name).inc(n);
+        container.getCounter(name).update(n);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index fbb0da3..48fa359 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.metrics;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -37,7 +38,7 @@ import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
  * cumulative values/updates.
  */
 @Experimental(Kind.METRICS)
-public class MetricsContainer {
+public class MetricsContainer implements Serializable {
 
   private final String stepName;
 
@@ -96,7 +97,7 @@ public class MetricsContainer {
     return gauges.get(metricName);
   }
 
-  private <UpdateT, CellT extends MetricCell<UpdateT>>
+  private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, 
UpdateT>>
   ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
       MetricsMap<MetricName, CellT> cells) {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
@@ -120,8 +121,8 @@ public class MetricsContainer {
         extractUpdates(gauges));
   }
 
-  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> 
cells) {
-    for (MetricCell<?> cell : cells.values()) {
+  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, 
?>> cells) {
+    for (MetricCell<?, ?> cell : cells.values()) {
       cell.getDirty().afterCommit();
     }
   }
@@ -133,9 +134,10 @@ public class MetricsContainer {
   public void commitUpdates() {
     commitUpdates(counters);
     commitUpdates(distributions);
+    commitUpdates(gauges);
   }
 
-  private <UpdateT, CellT extends MetricCell<UpdateT>>
+  private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, 
UpdateT>>
   ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
       MetricsMap<MetricName, CellT> cells) {
     ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
@@ -156,4 +158,21 @@ public class MetricsContainer {
         extractCumulatives(distributions),
         extractCumulatives(gauges));
   }
+
+  /**
+   * Update values of this {@link MetricsContainer} by merging the value of 
another cell.
+   */
+  public void update(MetricsContainer other) {
+    updateCells(counters, other.counters);
+    updateCells(distributions, other.distributions);
+    updateCells(gauges, other.gauges);
+  }
+
+  private <UserT extends Metric, DataT, CellT extends MetricCell<UserT, 
DataT>> void updateCells(
+      MetricsMap<MetricName, CellT> current,
+      MetricsMap<MetricName, CellT> updates) {
+    for (Map.Entry<MetricName, CellT> counter : updates.entries()) {
+      current.get(counter.getKey()).update(counter.getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
new file mode 100644
index 0000000..d01e970
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+
+/**
+ * Metrics containers by step.
+ *
+ * <p>This class is not thread-safe.</p>
+ */
+public class MetricsContainerStepMap implements Serializable {
+  private Map<String, MetricsContainer> metricsContainers;
+
+  public MetricsContainerStepMap() {
+    this.metricsContainers = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Returns the container for the given step name.
+   */
+  public MetricsContainer getContainer(String stepName) {
+    if (!metricsContainers.containsKey(stepName)) {
+      metricsContainers.put(stepName, new MetricsContainer(stepName));
+    }
+    return metricsContainers.get(stepName);
+  }
+
+  /**
+   * Update this {@link MetricsContainerStepMap} with all values from given
+   * {@link MetricsContainerStepMap}.
+   */
+  public void updateAll(MetricsContainerStepMap other) {
+    for (Map.Entry<String, MetricsContainer> container : 
other.metricsContainers.entrySet()) {
+      getContainer(container.getKey()).update(container.getValue());
+    }
+  }
+
+  /**
+   * Update {@link MetricsContainer} for given step in this map with all 
values from given
+   * {@link MetricsContainer}.
+   */
+  public void update(String step, MetricsContainer container) {
+    getContainer(step).update(container);
+  }
+
+  /**
+   * Returns {@link MetricResults} based on given
+   * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and 
committed metrics.
+   *
+   * <p>This constructor is intended for runners which support both attempted 
and committed
+   * metrics.
+   */
+  public static MetricResults asMetricResults(
+      MetricsContainerStepMap attemptedMetricsContainers,
+      MetricsContainerStepMap committedMetricsContainers) {
+    return new MetricsContainerStepMapMetricResults(
+        attemptedMetricsContainers,
+        committedMetricsContainers);
+  }
+
+  /**
+   * Returns {@link MetricResults} based on given {@link 
MetricsContainerStepMap} of attempted
+   * metrics.
+   *
+   * <p>This constructor is intended for runners which only support 
`attempted` metrics.
+   * Accessing {@link MetricResult#committed()} in the resulting {@link 
MetricResults} will result
+   * in an {@link UnsupportedOperationException}.</p>
+   */
+  public static MetricResults asAttemptedOnlyMetricResults(
+      MetricsContainerStepMap attemptedMetricsContainers) {
+    return new 
MetricsContainerStepMapMetricResults(attemptedMetricsContainers);
+  }
+
+  private Map<String, MetricsContainer> getMetricsContainers() {
+    return metricsContainers;
+  }
+
+  private static class MetricsContainerStepMapMetricResults extends 
MetricResults {
+    private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new 
HashMap<>();
+    private final Map<MetricKey, AttemptedAndCommitted<DistributionData>> 
distributions =
+        new HashMap<>();
+    private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges = 
new HashMap<>();
+    private final boolean isCommittedSupported;
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers) {
+      this(attemptedMetricsContainers, new MetricsContainerStepMap(), false);
+    }
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers,
+        MetricsContainerStepMap committedMetricsContainers) {
+      this(attemptedMetricsContainers, committedMetricsContainers, true);
+    }
+
+    private MetricsContainerStepMapMetricResults(
+        MetricsContainerStepMap attemptedMetricsContainers,
+        MetricsContainerStepMap committedMetricsContainers,
+        boolean isCommittedSupported) {
+      for (MetricsContainer container
+          : attemptedMetricsContainers.getMetricsContainers().values()) {
+        MetricUpdates cumulative = container.getCumulative();
+        mergeCounters(counters, cumulative.counterUpdates(), 
attemptedCounterUpdateFn());
+        mergeDistributions(distributions, cumulative.distributionUpdates(),
+            attemptedDistributionUpdateFn());
+        mergeGauges(gauges, cumulative.gaugeUpdates(), 
attemptedGaugeUpdateFn());
+      }
+      for (MetricsContainer container
+          : committedMetricsContainers.getMetricsContainers().values()) {
+        MetricUpdates cumulative = container.getCumulative();
+        mergeCounters(counters, cumulative.counterUpdates(), 
committedCounterUpdateFn());
+        mergeDistributions(distributions, cumulative.distributionUpdates(),
+            committedDistributionUpdateFn());
+        mergeGauges(gauges, cumulative.gaugeUpdates(), 
committedGaugeUpdateFn());
+      }
+      this.isCommittedSupported = isCommittedSupported;
+    }
+
+    private Function<MetricUpdate<DistributionData>, 
AttemptedAndCommitted<DistributionData>>
+    attemptedDistributionUpdateFn() {
+      return new Function<MetricUpdate<DistributionData>,
+          AttemptedAndCommitted<DistributionData>>() {
+        @Override
+        public AttemptedAndCommitted<DistributionData> 
apply(MetricUpdate<DistributionData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, DistributionData.EMPTY));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<DistributionData>, 
AttemptedAndCommitted<DistributionData>>
+    committedDistributionUpdateFn() {
+      return new Function<MetricUpdate<DistributionData>,
+          AttemptedAndCommitted<DistributionData>>() {
+        @Override
+        public AttemptedAndCommitted<DistributionData> 
apply(MetricUpdate<DistributionData> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, DistributionData.EMPTY),
+              input);
+        }
+      };
+    }
+
+    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+    attemptedGaugeUpdateFn() {
+      return new Function<MetricUpdate<GaugeData>, 
AttemptedAndCommitted<GaugeData>>() {
+        @Override
+        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> 
input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, GaugeData.empty()));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+    committedGaugeUpdateFn() {
+      return new Function<MetricUpdate<GaugeData>, 
AttemptedAndCommitted<GaugeData>>() {
+        @Override
+        public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> 
input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, GaugeData.empty()),
+              input);
+        }
+      };
+    }
+
+    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> 
attemptedCounterUpdateFn() {
+      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+        @Override
+        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              input,
+              MetricUpdate.create(key, 0L));
+        }
+      };
+    }
+
+    private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> 
committedCounterUpdateFn() {
+      return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() {
+        @Override
+        public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) {
+          MetricKey key = input.getKey();
+          return new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(key, 0L),
+              input);
+        }
+      };
+    }
+
+    @Override
+    public MetricQueryResults queryMetrics(MetricsFilter filter) {
+      return new QueryResults(filter);
+    }
+
+    private class QueryResults implements MetricQueryResults {
+      private final MetricsFilter filter;
+
+      private QueryResults(MetricsFilter filter) {
+        this.filter = filter;
+      }
+
+      @Override
+      public Iterable<MetricResult<Long>> counters() {
+        return
+            FluentIterable
+                .from(counters.values())
+                .filter(matchesFilter(filter))
+                .transform(counterUpdateToResult())
+                .toList();
+      }
+
+      @Override
+      public Iterable<MetricResult<DistributionResult>> distributions() {
+        return
+            FluentIterable
+                .from(distributions.values())
+                .filter(matchesFilter(filter))
+                .transform(distributionUpdateToResult())
+                .toList();
+      }
+
+      @Override
+      public Iterable<MetricResult<GaugeResult>> gauges() {
+        return
+            FluentIterable
+                .from(gauges.values())
+                .filter(matchesFilter(filter))
+                .transform(gaugeUpdateToResult())
+                .toList();
+      }
+
+      private Predicate<AttemptedAndCommitted<?>> matchesFilter(final 
MetricsFilter filter) {
+        return new Predicate<AttemptedAndCommitted<?>>() {
+          @Override
+          public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted) 
{
+            return MetricFiltering.matches(filter, 
attemptedAndCommitted.getKey());
+          }
+        };
+      }
+    }
+
+    private Function<AttemptedAndCommitted<Long>, MetricResult<Long>> 
counterUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() {
+            @Override
+            public MetricResult<Long>
+            apply(AttemptedAndCommitted<Long> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    private Function<AttemptedAndCommitted<DistributionData>, 
MetricResult<DistributionResult>>
+    distributionUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<DistributionData>, 
MetricResult<DistributionResult>>() {
+            @Override
+            public MetricResult<DistributionResult>
+            apply(AttemptedAndCommitted<DistributionData> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate().extractResult(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate().extractResult()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    private Function<AttemptedAndCommitted<GaugeData>, 
MetricResult<GaugeResult>>
+    gaugeUpdateToResult() {
+      return new
+          Function<AttemptedAndCommitted<GaugeData>, 
MetricResult<GaugeResult>>() {
+            @Override
+            public MetricResult<GaugeResult>
+            apply(AttemptedAndCommitted<GaugeData> metricResult) {
+              MetricKey key = metricResult.getKey();
+              return new AccumulatedMetricResult<>(
+                  key.metricName(),
+                  key.stepName(),
+                  metricResult.getAttempted().getUpdate().extractResult(),
+                  isCommittedSupported
+                      ? metricResult.getCommitted().getUpdate().extractResult()
+                      : null,
+                  isCommittedSupported);
+            }
+          };
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeCounters(
+        Map<MetricKey, AttemptedAndCommitted<Long>> counters,
+        Iterable<MetricUpdate<Long>> updates,
+        Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> 
updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<Long> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<Long> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (counters.containsKey(key)) {
+          AttemptedAndCommitted<Long> current = counters.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  update.getAttempted().getUpdate() + 
current.getAttempted().getUpdate()),
+              MetricUpdate.create(
+                  key,
+                  update.getCommitted().getUpdate() + 
current.getCommitted().getUpdate()));
+        }
+        counters.put(key, update);
+      }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeDistributions(
+        Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions,
+        Iterable<MetricUpdate<DistributionData>> updates,
+        Function<MetricUpdate<DistributionData>, 
AttemptedAndCommitted<DistributionData>>
+            updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<DistributionData> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<DistributionData> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (distributions.containsKey(key)) {
+          AttemptedAndCommitted<DistributionData> current = 
distributions.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  
update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+              MetricUpdate.create(
+                  key,
+                  
update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+        }
+        distributions.put(key, update);
+      }
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private void mergeGauges(
+        Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges,
+        Iterable<MetricUpdate<GaugeData>> updates,
+        Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>
+            updateToAttemptedAndCommittedFn) {
+      for (MetricUpdate<GaugeData> metricUpdate : updates) {
+        MetricKey key = metricUpdate.getKey();
+        AttemptedAndCommitted<GaugeData> update =
+            updateToAttemptedAndCommittedFn.apply(metricUpdate);
+        if (gauges.containsKey(key)) {
+          AttemptedAndCommitted<GaugeData> current = gauges.get(key);
+          update = new AttemptedAndCommitted<>(
+              key,
+              MetricUpdate.create(
+                  key,
+                  
update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())),
+              MetricUpdate.create(
+                  key,
+                  
update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate())));
+        }
+        gauges.put(key, update);
+      }
+    }
+
+    /**
+     * Accumulated implementation of {@link MetricResult}.
+     */
+    private static class AccumulatedMetricResult<T> implements MetricResult<T> 
{
+      private final MetricName name;
+      private final String step;
+      private final T attempted;
+      private final T committed;
+      private final boolean isCommittedSupported;
+
+      private AccumulatedMetricResult(
+          MetricName name,
+          String step,
+          T attempted,
+          T committed,
+          boolean isCommittedSupported) {
+        this.name = name;
+        this.step = step;
+        this.attempted = attempted;
+        this.committed = committed;
+        this.isCommittedSupported = isCommittedSupported;
+      }
+
+      @Override
+      public MetricName name() {
+        return name;
+      }
+
+      @Override
+      public String step() {
+        return step;
+      }
+
+      @Override
+      public T committed() {
+        if (!isCommittedSupported) {
+          throw new UnsupportedOperationException("This runner does not 
currently support committed"
+              + " metrics results. Please use 'attempted' instead.");
+        }
+        return committed;
+      }
+
+      @Override
+      public T attempted() {
+        return attempted;
+      }
+    }
+
+    /**
+     * Attempted and committed {@link MetricUpdate MetricUpdates}.
+     */
+    private static class AttemptedAndCommitted<T> {
+      private final MetricKey key;
+      private final MetricUpdate<T> attempted;
+      private final MetricUpdate<T> committed;
+
+      private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted,
+          MetricUpdate<T> committed) {
+        this.key = key;
+        this.attempted = attempted;
+        this.committed = committed;
+      }
+
+      private MetricKey getKey() {
+        return key;
+      }
+
+      private MetricUpdate<T> getAttempted() {
+        return attempted;
+      }
+
+      private MetricUpdate<T> getCommitted() {
+        return committed;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
index 5a02106..8c26f18 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.metrics;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Iterables;
+import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -31,10 +32,10 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
  * in a thread-safe manner.
  */
 @Experimental(Kind.METRICS)
-public class MetricsMap<K, T> {
+public class MetricsMap<K, T> implements Serializable {
 
   /** Interface for creating instances to populate the {@link MetricsMap}. */
-  public interface Factory<K, T> {
+  public interface Factory<K, T> extends Serializable {
     /**
      * Create an instance of {@code T} to use with the given {@code key}.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
index 408f145..26554d4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
@@ -35,8 +35,8 @@ public class CounterCellTest {
 
   @Test
   public void testDeltaAndCumulative() {
-    cell.inc(5);
-    cell.inc(7);
+    cell.update(5);
+    cell.update(7);
     assertThat(cell.getCumulative(), equalTo(12L));
     assertThat("getCumulative is idempotent", cell.getCumulative(), 
equalTo(12L));
 
@@ -45,7 +45,7 @@ public class CounterCellTest {
     assertThat(cell.getDirty().beforeCommit(), equalTo(false));
     assertThat(cell.getCumulative(), equalTo(12L));
 
-    cell.inc(30);
+    cell.update(30);
     assertThat(cell.getCumulative(), equalTo(42L));
 
     assertThat(cell.getDirty().beforeCommit(), equalTo(true));

http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
new file mode 100644
index 0000000..0428ce1
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult;
+import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static 
org.apache.beam.sdk.metrics.MetricsContainerStepMap.asMetricResults;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import java.io.Closeable;
+import java.io.IOException;
+import org.hamcrest.collection.IsIterableWithSize;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Tests for {@link MetricsContainerStepMap}.
+ */
+public class MetricsContainerStepMapTest {
+
+  private static final String NAMESPACE = 
MetricsContainerStepMapTest.class.getName();
+  private static final String STEP1 = "myStep1";
+  private static final String STEP2 = "myStep2";
+
+  private static final long VALUE = 100;
+
+  private static final Counter counter =
+      Metrics.counter(
+          MetricsContainerStepMapTest.class,
+          "myCounter");
+  private static final Distribution distribution =
+      Metrics.distribution(
+          MetricsContainerStepMapTest.class,
+          "myDistribution");
+  private static final Gauge gauge =
+      Metrics.gauge(
+          MetricsContainerStepMapTest.class,
+          "myGauge");
+
+  private static final MetricsContainer metricsContainer;
+
+  static {
+    metricsContainer = new MetricsContainer(null);
+    try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer)) {
+      counter.inc(VALUE);
+      distribution.update(VALUE);
+      distribution.update(VALUE * 2);
+      gauge.set(VALUE);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Rule
+  public transient ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    assertIterableSize(step1res.counters(), 1);
+    assertIterableSize(step1res.distributions(), 1);
+    assertIterableSize(step1res.gauges(), 1);
+
+    assertCounter(step1res, STEP1, VALUE, false);
+    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 
2, VALUE, VALUE * 2),
+        false);
+    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), 
false);
+
+    MetricQueryResults step2res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+    assertIterableSize(step2res.counters(), 1);
+    assertIterableSize(step2res.distributions(), 1);
+    assertIterableSize(step2res.gauges(), 1);
+
+    assertCounter(step2res, STEP2, VALUE * 2, false);
+    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 
4, VALUE, VALUE * 2),
+        false);
+    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), 
false);
+
+    MetricQueryResults allres =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+
+    assertIterableSize(allres.counters(), 2);
+    assertIterableSize(allres.distributions(), 2);
+    assertIterableSize(allres.gauges(), 2);
+  }
+
+  @Test
+  public void 
testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed 
metrics results.");
+
+    assertCounter(step1res, STEP1, VALUE, true);
+  }
+
+  @Test
+  public void 
testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed 
metrics results.");
+
+    assertDistribution(step1res, STEP1, DistributionResult.ZERO, true);
+  }
+
+  @Test
+  public void 
testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    MetricResults metricResults =
+        asAttemptedOnlyMetricResults(attemptedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("This runner does not currently support committed 
metrics results.");
+
+    assertGauge(step1res, STEP1, GaugeResult.empty(), true);
+  }
+
+  @Test
+  public void testAttemptedAndCommittedAccumulatedMetricResults() {
+    MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP1, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+    attemptedMetrics.update(STEP2, metricsContainer);
+
+    MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap();
+    committedMetrics.update(STEP1, metricsContainer);
+    committedMetrics.update(STEP2, metricsContainer);
+    committedMetrics.update(STEP2, metricsContainer);
+
+    MetricResults metricResults =
+        asMetricResults(attemptedMetrics, committedMetrics);
+
+    MetricQueryResults step1res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+    assertIterableSize(step1res.counters(), 1);
+    assertIterableSize(step1res.distributions(), 1);
+    assertIterableSize(step1res.gauges(), 1);
+
+    assertCounter(step1res, STEP1, VALUE * 2, false);
+    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 6, 
4, VALUE, VALUE * 2),
+        false);
+    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), 
false);
+
+    assertCounter(step1res, STEP1, VALUE, true);
+    assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 
2, VALUE, VALUE * 2),
+        true);
+    assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), 
true);
+
+    MetricQueryResults step2res =
+        
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
+
+    assertIterableSize(step2res.counters(), 1);
+    assertIterableSize(step2res.distributions(), 1);
+    assertIterableSize(step2res.gauges(), 1);
+
+    assertCounter(step2res, STEP2, VALUE * 3, false);
+    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 9, 
6, VALUE, VALUE * 2),
+        false);
+    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), 
false);
+
+    assertCounter(step2res, STEP2, VALUE * 2, true);
+    assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 
4, VALUE, VALUE * 2),
+        true);
+    assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), 
true);
+
+    MetricQueryResults allres =
+        metricResults.queryMetrics(MetricsFilter.builder().build());
+
+    assertIterableSize(allres.counters(), 2);
+    assertIterableSize(allres.distributions(), 2);
+    assertIterableSize(allres.gauges(), 2);
+  }
+
+  private <T> void assertIterableSize(Iterable<T> iterable, int size) {
+    assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size));
+  }
+
+  private void assertCounter(
+      MetricQueryResults metricQueryResults,
+      String step,
+      Long expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.counters(),
+        hasItem(metricsResult(NAMESPACE, counter.getName().name(), step, 
expected, isCommitted)));
+  }
+
+  private void assertDistribution(
+      MetricQueryResults metricQueryResults,
+      String step,
+      DistributionResult expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.distributions(),
+        hasItem(metricsResult(NAMESPACE, distribution.getName().name(), step, 
expected,
+            isCommitted)));
+  }
+
+  private void assertGauge(
+      MetricQueryResults metricQueryResults,
+      String step,
+      GaugeResult expected,
+      boolean isCommitted) {
+    assertThat(
+        metricQueryResults.gauges(),
+        hasItem(metricsResult(NAMESPACE, gauge.getName().name(), step, 
expected, isCommitted)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
index 58797ce..38c00d3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
@@ -47,8 +47,8 @@ public class MetricsContainerTest {
     assertThat("After commit no counters should be dirty",
         container.getUpdates().counterUpdates(), emptyIterable());
 
-    c1.inc(5L);
-    c2.inc(4L);
+    c1.update(5L);
+    c2.update(4L);
 
     assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
         metricUpdate("name1", 5L),
@@ -63,7 +63,7 @@ public class MetricsContainerTest {
     assertThat("After commit there are no updates",
         container.getUpdates().counterUpdates(), emptyIterable());
 
-    c1.inc(8L);
+    c1.update(8L);
     assertThat(container.getUpdates().counterUpdates(), contains(
         metricUpdate("name1", 13L)));
   }
@@ -73,9 +73,9 @@ public class MetricsContainerTest {
     MetricsContainer container = new MetricsContainer("step1");
     CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
     CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
-    c1.inc(2L);
-    c2.inc(4L);
-    c1.inc(3L);
+    c1.update(2L);
+    c2.update(4L);
+    c1.update(3L);
 
     container.getUpdates();
     container.commitUpdates();
@@ -84,7 +84,7 @@ public class MetricsContainerTest {
         metricUpdate("name1", 5L),
         metricUpdate("name2", 4L)));
 
-    c1.inc(8L);
+    c1.update(8L);
     assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
         metricUpdate("name1", 13L),
         metricUpdate("name2", 4L)));

Reply via email to