Repository: beam Updated Branches: refs/heads/release-2.0.0 bad377c67 -> f7e85e230
http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index 9286ea9..096d147 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -118,7 +118,7 @@ public class Metrics { @Override public void inc(long n) { MetricsContainer container = MetricsEnvironment.getCurrentContainer(); if (container != null) { - container.getCounter(name).inc(n); + container.getCounter(name).update(n); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index fbb0da3..48fa359 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.metrics; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.ImmutableList; +import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -37,7 +38,7 @@ import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; * cumulative values/updates. */ @Experimental(Kind.METRICS) -public class MetricsContainer { +public class MetricsContainer implements Serializable { private final String stepName; @@ -96,7 +97,7 @@ public class MetricsContainer { return gauges.get(metricName); } - private <UpdateT, CellT extends MetricCell<UpdateT>> + private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>> ImmutableList<MetricUpdate<UpdateT>> extractUpdates( MetricsMap<MetricName, CellT> cells) { ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); @@ -120,8 +121,8 @@ public class MetricsContainer { extractUpdates(gauges)); } - private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?>> cells) { - for (MetricCell<?> cell : cells.values()) { + private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) { + for (MetricCell<?, ?> cell : cells.values()) { cell.getDirty().afterCommit(); } } @@ -133,9 +134,10 @@ public class MetricsContainer { public void commitUpdates() { commitUpdates(counters); commitUpdates(distributions); + commitUpdates(gauges); } - private <UpdateT, CellT extends MetricCell<UpdateT>> + private <UserT extends Metric, UpdateT, CellT extends MetricCell<UserT, UpdateT>> ImmutableList<MetricUpdate<UpdateT>> extractCumulatives( MetricsMap<MetricName, CellT> cells) { ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); @@ -156,4 +158,21 @@ public class MetricsContainer { extractCumulatives(distributions), extractCumulatives(gauges)); } + + /** + * Update values of this {@link MetricsContainer} by merging the value of another cell. + */ + public void update(MetricsContainer other) { + updateCells(counters, other.counters); + updateCells(distributions, other.distributions); + updateCells(gauges, other.gauges); + } + + private <UserT extends Metric, DataT, CellT extends MetricCell<UserT, DataT>> void updateCells( + MetricsMap<MetricName, CellT> current, + MetricsMap<MetricName, CellT> updates) { + for (Map.Entry<MetricName, CellT> counter : updates.entries()) { + current.get(counter.getKey()).update(counter.getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java new file mode 100644 index 0000000..d01e970 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainerStepMap.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; + +/** + * Metrics containers by step. + * + * <p>This class is not thread-safe.</p> + */ +public class MetricsContainerStepMap implements Serializable { + private Map<String, MetricsContainer> metricsContainers; + + public MetricsContainerStepMap() { + this.metricsContainers = new ConcurrentHashMap<>(); + } + + /** + * Returns the container for the given step name. + */ + public MetricsContainer getContainer(String stepName) { + if (!metricsContainers.containsKey(stepName)) { + metricsContainers.put(stepName, new MetricsContainer(stepName)); + } + return metricsContainers.get(stepName); + } + + /** + * Update this {@link MetricsContainerStepMap} with all values from given + * {@link MetricsContainerStepMap}. + */ + public void updateAll(MetricsContainerStepMap other) { + for (Map.Entry<String, MetricsContainer> container : other.metricsContainers.entrySet()) { + getContainer(container.getKey()).update(container.getValue()); + } + } + + /** + * Update {@link MetricsContainer} for given step in this map with all values from given + * {@link MetricsContainer}. + */ + public void update(String step, MetricsContainer container) { + getContainer(step).update(container); + } + + /** + * Returns {@link MetricResults} based on given + * {@link MetricsContainerStepMap MetricsContainerStepMaps} of attempted and committed metrics. + * + * <p>This constructor is intended for runners which support both attempted and committed + * metrics. + */ + public static MetricResults asMetricResults( + MetricsContainerStepMap attemptedMetricsContainers, + MetricsContainerStepMap committedMetricsContainers) { + return new MetricsContainerStepMapMetricResults( + attemptedMetricsContainers, + committedMetricsContainers); + } + + /** + * Returns {@link MetricResults} based on given {@link MetricsContainerStepMap} of attempted + * metrics. + * + * <p>This constructor is intended for runners which only support `attempted` metrics. + * Accessing {@link MetricResult#committed()} in the resulting {@link MetricResults} will result + * in an {@link UnsupportedOperationException}.</p> + */ + public static MetricResults asAttemptedOnlyMetricResults( + MetricsContainerStepMap attemptedMetricsContainers) { + return new MetricsContainerStepMapMetricResults(attemptedMetricsContainers); + } + + private Map<String, MetricsContainer> getMetricsContainers() { + return metricsContainers; + } + + private static class MetricsContainerStepMapMetricResults extends MetricResults { + private final Map<MetricKey, AttemptedAndCommitted<Long>> counters = new HashMap<>(); + private final Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions = + new HashMap<>(); + private final Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges = new HashMap<>(); + private final boolean isCommittedSupported; + + private MetricsContainerStepMapMetricResults( + MetricsContainerStepMap attemptedMetricsContainers) { + this(attemptedMetricsContainers, new MetricsContainerStepMap(), false); + } + + private MetricsContainerStepMapMetricResults( + MetricsContainerStepMap attemptedMetricsContainers, + MetricsContainerStepMap committedMetricsContainers) { + this(attemptedMetricsContainers, committedMetricsContainers, true); + } + + private MetricsContainerStepMapMetricResults( + MetricsContainerStepMap attemptedMetricsContainers, + MetricsContainerStepMap committedMetricsContainers, + boolean isCommittedSupported) { + for (MetricsContainer container + : attemptedMetricsContainers.getMetricsContainers().values()) { + MetricUpdates cumulative = container.getCumulative(); + mergeCounters(counters, cumulative.counterUpdates(), attemptedCounterUpdateFn()); + mergeDistributions(distributions, cumulative.distributionUpdates(), + attemptedDistributionUpdateFn()); + mergeGauges(gauges, cumulative.gaugeUpdates(), attemptedGaugeUpdateFn()); + } + for (MetricsContainer container + : committedMetricsContainers.getMetricsContainers().values()) { + MetricUpdates cumulative = container.getCumulative(); + mergeCounters(counters, cumulative.counterUpdates(), committedCounterUpdateFn()); + mergeDistributions(distributions, cumulative.distributionUpdates(), + committedDistributionUpdateFn()); + mergeGauges(gauges, cumulative.gaugeUpdates(), committedGaugeUpdateFn()); + } + this.isCommittedSupported = isCommittedSupported; + } + + private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>> + attemptedDistributionUpdateFn() { + return new Function<MetricUpdate<DistributionData>, + AttemptedAndCommitted<DistributionData>>() { + @Override + public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) { + MetricKey key = input.getKey(); + return new AttemptedAndCommitted<>( + key, + input, + MetricUpdate.create(key, DistributionData.EMPTY)); + } + }; + } + + private Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>> + committedDistributionUpdateFn() { + return new Function<MetricUpdate<DistributionData>, + AttemptedAndCommitted<DistributionData>>() { + @Override + public AttemptedAndCommitted<DistributionData> apply(MetricUpdate<DistributionData> input) { + MetricKey key = input.getKey(); + return new AttemptedAndCommitted<>( + key, + MetricUpdate.create(key, DistributionData.EMPTY), + input); + } + }; + } + + private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>> + attemptedGaugeUpdateFn() { + return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() { + @Override + public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) { + MetricKey key = input.getKey(); + return new AttemptedAndCommitted<>( + key, + input, + MetricUpdate.create(key, GaugeData.empty())); + } + }; + } + + private Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>> + committedGaugeUpdateFn() { + return new Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>>() { + @Override + public AttemptedAndCommitted<GaugeData> apply(MetricUpdate<GaugeData> input) { + MetricKey key = input.getKey(); + return new AttemptedAndCommitted<>( + key, + MetricUpdate.create(key, GaugeData.empty()), + input); + } + }; + } + + private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> attemptedCounterUpdateFn() { + return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() { + @Override + public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) { + MetricKey key = input.getKey(); + return new AttemptedAndCommitted<>( + key, + input, + MetricUpdate.create(key, 0L)); + } + }; + } + + private Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> committedCounterUpdateFn() { + return new Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>>() { + @Override + public AttemptedAndCommitted<Long> apply(MetricUpdate<Long> input) { + MetricKey key = input.getKey(); + return new AttemptedAndCommitted<>( + key, + MetricUpdate.create(key, 0L), + input); + } + }; + } + + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { + return new QueryResults(filter); + } + + private class QueryResults implements MetricQueryResults { + private final MetricsFilter filter; + + private QueryResults(MetricsFilter filter) { + this.filter = filter; + } + + @Override + public Iterable<MetricResult<Long>> counters() { + return + FluentIterable + .from(counters.values()) + .filter(matchesFilter(filter)) + .transform(counterUpdateToResult()) + .toList(); + } + + @Override + public Iterable<MetricResult<DistributionResult>> distributions() { + return + FluentIterable + .from(distributions.values()) + .filter(matchesFilter(filter)) + .transform(distributionUpdateToResult()) + .toList(); + } + + @Override + public Iterable<MetricResult<GaugeResult>> gauges() { + return + FluentIterable + .from(gauges.values()) + .filter(matchesFilter(filter)) + .transform(gaugeUpdateToResult()) + .toList(); + } + + private Predicate<AttemptedAndCommitted<?>> matchesFilter(final MetricsFilter filter) { + return new Predicate<AttemptedAndCommitted<?>>() { + @Override + public boolean apply(AttemptedAndCommitted<?> attemptedAndCommitted) { + return MetricFiltering.matches(filter, attemptedAndCommitted.getKey()); + } + }; + } + } + + private Function<AttemptedAndCommitted<Long>, MetricResult<Long>> counterUpdateToResult() { + return new + Function<AttemptedAndCommitted<Long>, MetricResult<Long>>() { + @Override + public MetricResult<Long> + apply(AttemptedAndCommitted<Long> metricResult) { + MetricKey key = metricResult.getKey(); + return new AccumulatedMetricResult<>( + key.metricName(), + key.stepName(), + metricResult.getAttempted().getUpdate(), + isCommittedSupported + ? metricResult.getCommitted().getUpdate() + : null, + isCommittedSupported); + } + }; + } + + private Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>> + distributionUpdateToResult() { + return new + Function<AttemptedAndCommitted<DistributionData>, MetricResult<DistributionResult>>() { + @Override + public MetricResult<DistributionResult> + apply(AttemptedAndCommitted<DistributionData> metricResult) { + MetricKey key = metricResult.getKey(); + return new AccumulatedMetricResult<>( + key.metricName(), + key.stepName(), + metricResult.getAttempted().getUpdate().extractResult(), + isCommittedSupported + ? metricResult.getCommitted().getUpdate().extractResult() + : null, + isCommittedSupported); + } + }; + } + + private Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>> + gaugeUpdateToResult() { + return new + Function<AttemptedAndCommitted<GaugeData>, MetricResult<GaugeResult>>() { + @Override + public MetricResult<GaugeResult> + apply(AttemptedAndCommitted<GaugeData> metricResult) { + MetricKey key = metricResult.getKey(); + return new AccumulatedMetricResult<>( + key.metricName(), + key.stepName(), + metricResult.getAttempted().getUpdate().extractResult(), + isCommittedSupported + ? metricResult.getCommitted().getUpdate().extractResult() + : null, + isCommittedSupported); + } + }; + } + + @SuppressWarnings("ConstantConditions") + private void mergeCounters( + Map<MetricKey, AttemptedAndCommitted<Long>> counters, + Iterable<MetricUpdate<Long>> updates, + Function<MetricUpdate<Long>, AttemptedAndCommitted<Long>> updateToAttemptedAndCommittedFn) { + for (MetricUpdate<Long> metricUpdate : updates) { + MetricKey key = metricUpdate.getKey(); + AttemptedAndCommitted<Long> update = + updateToAttemptedAndCommittedFn.apply(metricUpdate); + if (counters.containsKey(key)) { + AttemptedAndCommitted<Long> current = counters.get(key); + update = new AttemptedAndCommitted<>( + key, + MetricUpdate.create( + key, + update.getAttempted().getUpdate() + current.getAttempted().getUpdate()), + MetricUpdate.create( + key, + update.getCommitted().getUpdate() + current.getCommitted().getUpdate())); + } + counters.put(key, update); + } + } + + @SuppressWarnings("ConstantConditions") + private void mergeDistributions( + Map<MetricKey, AttemptedAndCommitted<DistributionData>> distributions, + Iterable<MetricUpdate<DistributionData>> updates, + Function<MetricUpdate<DistributionData>, AttemptedAndCommitted<DistributionData>> + updateToAttemptedAndCommittedFn) { + for (MetricUpdate<DistributionData> metricUpdate : updates) { + MetricKey key = metricUpdate.getKey(); + AttemptedAndCommitted<DistributionData> update = + updateToAttemptedAndCommittedFn.apply(metricUpdate); + if (distributions.containsKey(key)) { + AttemptedAndCommitted<DistributionData> current = distributions.get(key); + update = new AttemptedAndCommitted<>( + key, + MetricUpdate.create( + key, + update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())), + MetricUpdate.create( + key, + update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate()))); + } + distributions.put(key, update); + } + } + + @SuppressWarnings("ConstantConditions") + private void mergeGauges( + Map<MetricKey, AttemptedAndCommitted<GaugeData>> gauges, + Iterable<MetricUpdate<GaugeData>> updates, + Function<MetricUpdate<GaugeData>, AttemptedAndCommitted<GaugeData>> + updateToAttemptedAndCommittedFn) { + for (MetricUpdate<GaugeData> metricUpdate : updates) { + MetricKey key = metricUpdate.getKey(); + AttemptedAndCommitted<GaugeData> update = + updateToAttemptedAndCommittedFn.apply(metricUpdate); + if (gauges.containsKey(key)) { + AttemptedAndCommitted<GaugeData> current = gauges.get(key); + update = new AttemptedAndCommitted<>( + key, + MetricUpdate.create( + key, + update.getAttempted().getUpdate().combine(current.getAttempted().getUpdate())), + MetricUpdate.create( + key, + update.getCommitted().getUpdate().combine(current.getCommitted().getUpdate()))); + } + gauges.put(key, update); + } + } + + /** + * Accumulated implementation of {@link MetricResult}. + */ + private static class AccumulatedMetricResult<T> implements MetricResult<T> { + private final MetricName name; + private final String step; + private final T attempted; + private final T committed; + private final boolean isCommittedSupported; + + private AccumulatedMetricResult( + MetricName name, + String step, + T attempted, + T committed, + boolean isCommittedSupported) { + this.name = name; + this.step = step; + this.attempted = attempted; + this.committed = committed; + this.isCommittedSupported = isCommittedSupported; + } + + @Override + public MetricName name() { + return name; + } + + @Override + public String step() { + return step; + } + + @Override + public T committed() { + if (!isCommittedSupported) { + throw new UnsupportedOperationException("This runner does not currently support committed" + + " metrics results. Please use 'attempted' instead."); + } + return committed; + } + + @Override + public T attempted() { + return attempted; + } + } + + /** + * Attempted and committed {@link MetricUpdate MetricUpdates}. + */ + private static class AttemptedAndCommitted<T> { + private final MetricKey key; + private final MetricUpdate<T> attempted; + private final MetricUpdate<T> committed; + + private AttemptedAndCommitted(MetricKey key, MetricUpdate<T> attempted, + MetricUpdate<T> committed) { + this.key = key; + this.attempted = attempted; + this.committed = committed; + } + + private MetricKey getKey() { + return key; + } + + private MetricUpdate<T> getAttempted() { + return attempted; + } + + private MetricUpdate<T> getCommitted() { + return committed; + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java index 5a02106..8c26f18 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.metrics; import com.google.common.base.MoreObjects; import com.google.common.collect.Iterables; +import java.io.Serializable; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -31,10 +32,10 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; * in a thread-safe manner. */ @Experimental(Kind.METRICS) -public class MetricsMap<K, T> { +public class MetricsMap<K, T> implements Serializable { /** Interface for creating instances to populate the {@link MetricsMap}. */ - public interface Factory<K, T> { + public interface Factory<K, T> extends Serializable { /** * Create an instance of {@code T} to use with the given {@code key}. * http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java index 408f145..26554d4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java @@ -35,8 +35,8 @@ public class CounterCellTest { @Test public void testDeltaAndCumulative() { - cell.inc(5); - cell.inc(7); + cell.update(5); + cell.update(7); assertThat(cell.getCumulative(), equalTo(12L)); assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L)); @@ -45,7 +45,7 @@ public class CounterCellTest { assertThat(cell.getDirty().beforeCommit(), equalTo(false)); assertThat(cell.getCumulative(), equalTo(12L)); - cell.inc(30); + cell.update(30); assertThat(cell.getCumulative(), equalTo(42L)); assertThat(cell.getDirty().beforeCommit(), equalTo(true)); http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java new file mode 100644 index 0000000..0428ce1 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerStepMapTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static org.apache.beam.sdk.metrics.MetricMatchers.metricsResult; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asMetricResults; +import static org.hamcrest.Matchers.hasItem; +import static org.junit.Assert.assertThat; + +import java.io.Closeable; +import java.io.IOException; +import org.hamcrest.collection.IsIterableWithSize; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Tests for {@link MetricsContainerStepMap}. + */ +public class MetricsContainerStepMapTest { + + private static final String NAMESPACE = MetricsContainerStepMapTest.class.getName(); + private static final String STEP1 = "myStep1"; + private static final String STEP2 = "myStep2"; + + private static final long VALUE = 100; + + private static final Counter counter = + Metrics.counter( + MetricsContainerStepMapTest.class, + "myCounter"); + private static final Distribution distribution = + Metrics.distribution( + MetricsContainerStepMapTest.class, + "myDistribution"); + private static final Gauge gauge = + Metrics.gauge( + MetricsContainerStepMapTest.class, + "myGauge"); + + private static final MetricsContainer metricsContainer; + + static { + metricsContainer = new MetricsContainer(null); + try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer)) { + counter.inc(VALUE); + distribution.update(VALUE); + distribution.update(VALUE * 2); + gauge.set(VALUE); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + assertIterableSize(step1res.counters(), 1); + assertIterableSize(step1res.distributions(), 1); + assertIterableSize(step1res.gauges(), 1); + + assertCounter(step1res, STEP1, VALUE, false); + assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), + false); + assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); + + MetricQueryResults step2res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); + + assertIterableSize(step2res.counters(), 1); + assertIterableSize(step2res.distributions(), 1); + assertIterableSize(step2res.gauges(), 1); + + assertCounter(step2res, STEP2, VALUE * 2, false); + assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), + false); + assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); + + MetricQueryResults allres = + metricResults.queryMetrics(MetricsFilter.builder().build()); + + assertIterableSize(allres.counters(), 2); + assertIterableSize(allres.distributions(), 2); + assertIterableSize(allres.gauges(), 2); + } + + @Test + public void testCounterCommittedUnsupportedInAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage("This runner does not currently support committed metrics results."); + + assertCounter(step1res, STEP1, VALUE, true); + } + + @Test + public void testDistributionCommittedUnsupportedInAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage("This runner does not currently support committed metrics results."); + + assertDistribution(step1res, STEP1, DistributionResult.ZERO, true); + } + + @Test + public void testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + MetricResults metricResults = + asAttemptedOnlyMetricResults(attemptedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage("This runner does not currently support committed metrics results."); + + assertGauge(step1res, STEP1, GaugeResult.empty(), true); + } + + @Test + public void testAttemptedAndCommittedAccumulatedMetricResults() { + MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap(); + attemptedMetrics.update(STEP1, metricsContainer); + attemptedMetrics.update(STEP1, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + attemptedMetrics.update(STEP2, metricsContainer); + + MetricsContainerStepMap committedMetrics = new MetricsContainerStepMap(); + committedMetrics.update(STEP1, metricsContainer); + committedMetrics.update(STEP2, metricsContainer); + committedMetrics.update(STEP2, metricsContainer); + + MetricResults metricResults = + asMetricResults(attemptedMetrics, committedMetrics); + + MetricQueryResults step1res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build()); + + assertIterableSize(step1res.counters(), 1); + assertIterableSize(step1res.distributions(), 1); + assertIterableSize(step1res.gauges(), 1); + + assertCounter(step1res, STEP1, VALUE * 2, false); + assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), + false); + assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false); + + assertCounter(step1res, STEP1, VALUE, true); + assertDistribution(step1res, STEP1, DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2), + true); + assertGauge(step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true); + + MetricQueryResults step2res = + metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build()); + + assertIterableSize(step2res.counters(), 1); + assertIterableSize(step2res.distributions(), 1); + assertIterableSize(step2res.gauges(), 1); + + assertCounter(step2res, STEP2, VALUE * 3, false); + assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2), + false); + assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false); + + assertCounter(step2res, STEP2, VALUE * 2, true); + assertDistribution(step2res, STEP2, DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2), + true); + assertGauge(step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true); + + MetricQueryResults allres = + metricResults.queryMetrics(MetricsFilter.builder().build()); + + assertIterableSize(allres.counters(), 2); + assertIterableSize(allres.distributions(), 2); + assertIterableSize(allres.gauges(), 2); + } + + private <T> void assertIterableSize(Iterable<T> iterable, int size) { + assertThat(iterable, IsIterableWithSize.<T>iterableWithSize(size)); + } + + private void assertCounter( + MetricQueryResults metricQueryResults, + String step, + Long expected, + boolean isCommitted) { + assertThat( + metricQueryResults.counters(), + hasItem(metricsResult(NAMESPACE, counter.getName().name(), step, expected, isCommitted))); + } + + private void assertDistribution( + MetricQueryResults metricQueryResults, + String step, + DistributionResult expected, + boolean isCommitted) { + assertThat( + metricQueryResults.distributions(), + hasItem(metricsResult(NAMESPACE, distribution.getName().name(), step, expected, + isCommitted))); + } + + private void assertGauge( + MetricQueryResults metricQueryResults, + String step, + GaugeResult expected, + boolean isCommitted) { + assertThat( + metricQueryResults.gauges(), + hasItem(metricsResult(NAMESPACE, gauge.getName().name(), step, expected, isCommitted))); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/3a4ffd2c/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java index 58797ce..38c00d3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java @@ -47,8 +47,8 @@ public class MetricsContainerTest { assertThat("After commit no counters should be dirty", container.getUpdates().counterUpdates(), emptyIterable()); - c1.inc(5L); - c2.inc(4L); + c1.update(5L); + c2.update(4L); assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder( metricUpdate("name1", 5L), @@ -63,7 +63,7 @@ public class MetricsContainerTest { assertThat("After commit there are no updates", container.getUpdates().counterUpdates(), emptyIterable()); - c1.inc(8L); + c1.update(8L); assertThat(container.getUpdates().counterUpdates(), contains( metricUpdate("name1", 13L))); } @@ -73,9 +73,9 @@ public class MetricsContainerTest { MetricsContainer container = new MetricsContainer("step1"); CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); - c1.inc(2L); - c2.inc(4L); - c1.inc(3L); + c1.update(2L); + c2.update(4L); + c1.update(3L); container.getUpdates(); container.commitUpdates(); @@ -84,7 +84,7 @@ public class MetricsContainerTest { metricUpdate("name1", 5L), metricUpdate("name2", 4L))); - c1.inc(8L); + c1.update(8L); assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder( metricUpdate("name1", 13L), metricUpdate("name2", 4L)));