Repository: beam Updated Branches: refs/heads/master 026aec856 -> b26e10b44
[BEAM-1617] Add Gauge metric type to Java SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/63e953c6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/63e953c6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/63e953c6 Branch: refs/heads/master Commit: 63e953c6026192e5e027f0bac183b86992480127 Parents: 026aec8 Author: Aviem Zur <aviem...@gmail.com> Authored: Fri Mar 3 14:42:23 2017 +0200 Committer: Aviem Zur <aviem...@gmail.com> Committed: Mon Mar 27 19:01:58 2017 +0300 ---------------------------------------------------------------------- .../beam/runners/direct/DirectMetrics.java | 59 +++++++++++++- .../beam/runners/direct/DirectMetricsTest.java | 42 ++++++++-- .../beam/runners/dataflow/DataflowMetrics.java | 16 +++- .../runners/spark/metrics/SparkBeamMetric.java | 4 + .../spark/metrics/SparkMetricResults.java | 27 +++++++ .../spark/metrics/SparkMetricsContainer.java | 20 +++++ .../java/org/apache/beam/sdk/metrics/Gauge.java | 32 ++++++++ .../org/apache/beam/sdk/metrics/GaugeCell.java | 60 +++++++++++++++ .../org/apache/beam/sdk/metrics/GaugeData.java | 81 ++++++++++++++++++++ .../apache/beam/sdk/metrics/GaugeResult.java | 61 +++++++++++++++ .../beam/sdk/metrics/MetricQueryResults.java | 3 + .../apache/beam/sdk/metrics/MetricUpdates.java | 11 ++- .../org/apache/beam/sdk/metrics/Metrics.java | 35 +++++++++ .../beam/sdk/metrics/MetricsContainer.java | 26 ++++++- .../apache/beam/sdk/metrics/GaugeCellTest.java | 48 ++++++++++++ .../apache/beam/sdk/metrics/MetricMatchers.java | 12 ++- .../apache/beam/sdk/metrics/MetricsTest.java | 37 +++++---- 17 files changed, 539 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index f04dc21..fb126fb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -33,6 +33,8 @@ import javax.annotation.concurrent.GuardedBy; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricFiltering; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; @@ -193,6 +195,28 @@ class DirectMetrics extends MetricResults { } }; + private static final MetricAggregation<GaugeData, GaugeResult> GAUGE = + new MetricAggregation<GaugeData, GaugeResult>() { + @Override + public GaugeData zero() { + return GaugeData.empty(); + } + + @Override + public GaugeData combine(Iterable<GaugeData> updates) { + GaugeData result = GaugeData.empty(); + for (GaugeData update : updates) { + result = result.combine(update); + } + return result; + } + + @Override + public GaugeResult extract(GaugeData data) { + return data.extractResult(); + } + }; + /** The current values of counters in memory. */ private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters = new MetricsMap<>(new MetricsMap.Factory<MetricKey, DirectMetric<Long, Long>>() { @@ -210,13 +234,23 @@ class DirectMetrics extends MetricResults { return new DirectMetric<>(DISTRIBUTION); } }); + private MetricsMap<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauges = + new MetricsMap<>( + new MetricsMap.Factory<MetricKey, DirectMetric<GaugeData, GaugeResult>>() { + @Override + public DirectMetric<GaugeData, GaugeResult> createInstance( + MetricKey unusedKey) { + return new DirectMetric<>(GAUGE); + } + }); @AutoValue abstract static class DirectMetricQueryResults implements MetricQueryResults { public static MetricQueryResults create( Iterable<MetricResult<Long>> counters, - Iterable<MetricResult<DistributionResult>> distributions) { - return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions); + Iterable<MetricResult<DistributionResult>> distributions, + Iterable<MetricResult<GaugeResult>> gauges) { + return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions, gauges); } } @@ -248,8 +282,15 @@ class DirectMetrics extends MetricResults { : distributions.entries()) { maybeExtractResult(filter, distributionResults, distribution); } + ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = + ImmutableList.builder(); + for (Entry<MetricKey, DirectMetric<GaugeData, GaugeResult>> gauge + : gauges.entries()) { + maybeExtractResult(filter, gaugeResults, gauge); + } - return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build()); + return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build(), + gaugeResults.build()); } private <ResultT> void maybeExtractResult( @@ -274,6 +315,10 @@ class DirectMetrics extends MetricResults { distributions.get(distribution.getKey()) .updatePhysical(bundle, distribution.getUpdate()); } + for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) { + gauges.get(gauge.getKey()) + .updatePhysical(bundle, gauge.getUpdate()); + } } public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) { @@ -284,6 +329,10 @@ class DirectMetrics extends MetricResults { distributions.get(distribution.getKey()) .commitPhysical(bundle, distribution.getUpdate()); } + for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) { + gauges.get(gauge.getKey()) + .commitPhysical(bundle, gauge.getUpdate()); + } } /** Apply metric updates that represent new logical values from a bundle being committed. */ @@ -295,5 +344,9 @@ class DirectMetrics extends MetricResults { distributions.get(distribution.getKey()) .commitLogical(bundle, distribution.getUpdate()); } + for (MetricUpdate<GaugeData> gauge : updates.gaugeUpdates()) { + gauges.get(gauge.getKey()) + .commitLogical(bundle, gauge.getUpdate()); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index 7183124..ee51e9a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -29,12 +29,15 @@ import com.google.common.collect.ImmutableList; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -56,6 +59,7 @@ public class DirectMetricsTest { private static final MetricName NAME1 = MetricName.named("ns1", "name1"); private static final MetricName NAME2 = MetricName.named("ns1", "name2"); private static final MetricName NAME3 = MetricName.named("ns2", "name1"); + private static final MetricName NAME4 = MetricName.named("ns2", "name2"); private DirectMetrics metrics = new DirectMetrics(); @@ -73,14 +77,20 @@ public class DirectMetricsTest { MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)), ImmutableList.of( MetricUpdate.create(MetricKey.create("step1", NAME1), - DistributionData.create(8, 2, 3, 5))))); + DistributionData.create(8, 2, 3, 5))), + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L))) + )); metrics.commitLogical(bundle1, MetricUpdates.create( ImmutableList.of( MetricUpdate.create(MetricKey.create("step2", NAME1), 7L), MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)), ImmutableList.of( MetricUpdate.create(MetricKey.create("step1", NAME1), - DistributionData.create(4, 1, 4, 4))))); + DistributionData.create(4, 1, 4, 4))), + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L))) + )); MetricQueryResults results = metrics.queryMetrics(MetricsFilter.builder().build()); assertThat(results.counters(), containsInAnyOrder( @@ -95,6 +105,12 @@ public class DirectMetricsTest { attemptedMetricsResult("ns1", "name1", "step1", DistributionResult.ZERO))); assertThat(results.distributions(), contains( committedMetricsResult("ns1", "name1", "step1", DistributionResult.create(12, 3, 3, 5)))); + assertThat(results.gauges(), contains( + attemptedMetricsResult("ns2", "name2", "step1", GaugeResult.empty()) + )); + assertThat(results.gauges(), contains( + committedMetricsResult("ns2", "name2", "step1", GaugeResult.create(27L, Instant.now())) + )); } @SuppressWarnings("unchecked") @@ -104,12 +120,16 @@ public class DirectMetricsTest { ImmutableList.of( MetricUpdate.create(MetricKey.create("step1", NAME1), 5L), MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)), - ImmutableList.<MetricUpdate<DistributionData>>of())); + ImmutableList.<MetricUpdate<DistributionData>>of(), + ImmutableList.<MetricUpdate<GaugeData>>of() + )); metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( MetricUpdate.create(MetricKey.create("step2", NAME1), 7L), MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)), - ImmutableList.<MetricUpdate<DistributionData>>of())); + ImmutableList.<MetricUpdate<DistributionData>>of(), + ImmutableList.<MetricUpdate<GaugeData>>of() + )); MetricQueryResults results = metrics.queryMetrics( MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()); @@ -132,12 +152,14 @@ public class DirectMetricsTest { ImmutableList.of( MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L), MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)), - ImmutableList.<MetricUpdate<DistributionData>>of())); + ImmutableList.<MetricUpdate<DistributionData>>of(), + ImmutableList.<MetricUpdate<GaugeData>>of())); metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L), MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), - ImmutableList.<MetricUpdate<DistributionData>>of())); + ImmutableList.<MetricUpdate<DistributionData>>of(), + ImmutableList.<MetricUpdate<GaugeData>>of())); MetricQueryResults results = metrics.queryMetrics( MetricsFilter.builder().addStep("Outer1").build()); @@ -161,12 +183,16 @@ public class DirectMetricsTest { ImmutableList.of( MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L), MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)), - ImmutableList.<MetricUpdate<DistributionData>>of())); + ImmutableList.<MetricUpdate<DistributionData>>of(), + ImmutableList.<MetricUpdate<GaugeData>>of() + )); metrics.updatePhysical(bundle1, MetricUpdates.create( ImmutableList.of( MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L), MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)), - ImmutableList.<MetricUpdate<DistributionData>>of())); + ImmutableList.<MetricUpdate<DistributionData>>of(), + ImmutableList.<MetricUpdate<GaugeData>>of() + )); MetricQueryResults results = metrics.queryMetrics( MetricsFilter.builder().addStep("Top1/Outer1").build()); http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java index c0d1883..9d28ef6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricFiltering; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; @@ -130,6 +131,7 @@ class DataflowMetrics extends MetricResults { ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder(); ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults = ImmutableList.builder(); + ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults = ImmutableList.builder(); for (MetricKey metricKey : metricHashKeys) { String metricName = metricKey.metricName().name(); if (metricName.endsWith("[MIN]") || metricName.endsWith("[MAX]") @@ -149,19 +151,23 @@ class DataflowMetrics extends MetricResults { step, committed, attempted)); } } - return DataflowMetricQueryResults.create(counterResults.build(), distributionResults.build()); + return DataflowMetricQueryResults.create( + counterResults.build(), + distributionResults.build(), + gaugeResults.build()); } private MetricQueryResults queryServiceForMetrics(MetricsFilter filter) { List<com.google.api.services.dataflow.model.MetricUpdate> metricUpdates; ImmutableList<MetricResult<Long>> counters = ImmutableList.of(); ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of(); + ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of(); JobMetrics jobMetrics; try { jobMetrics = dataflowClient.getJobMetrics(dataflowPipelineJob.jobId); } catch (IOException e) { LOG.warn("Unable to query job metrics.\n"); - return DataflowMetricQueryResults.create(counters, distributions); + return DataflowMetricQueryResults.create(counters, distributions, gauges); } metricUpdates = jobMetrics.getMetrics(); return populateMetricQueryResults(metricUpdates, filter); @@ -189,8 +195,10 @@ class DataflowMetrics extends MetricResults { abstract static class DataflowMetricQueryResults implements MetricQueryResults { public static MetricQueryResults create( Iterable<MetricResult<Long>> counters, - Iterable<MetricResult<DistributionResult>> distributions) { - return new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions); + Iterable<MetricResult<DistributionResult>> distributions, + Iterable<MetricResult<GaugeResult>> gauges) { + return + new AutoValue_DataflowMetrics_DataflowMetricQueryResults(counters, distributions, gauges); } } http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java index 8328a1a..2d445a9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java @@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; @@ -53,6 +54,9 @@ class SparkBeamMetric implements Metric { metrics.put(renderName(metricResult) + ".max", result.max()); metrics.put(renderName(metricResult) + ".mean", result.mean()); } + for (MetricResult<GaugeResult> metricResult : metricQueryResults.gauges()) { + metrics.put(renderName(metricResult), metricResult.attempted().value()); + } return metrics; } http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java index a9651e2..c02027a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java @@ -25,6 +25,8 @@ import com.google.common.collect.FluentIterable; import java.util.Set; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricNameFilter; @@ -72,6 +74,16 @@ public class SparkMetricResults extends MetricResults { .toList(); } + @Override + public Iterable<MetricResult<GaugeResult>> gauges() { + return + FluentIterable + .from(SparkMetricsContainer.getGauges()) + .filter(matchesFilter(filter)) + .transform(TO_GAUGE_RESULT) + .toList(); + } + private Predicate<MetricUpdate<?>> matchesFilter(final MetricsFilter filter) { return new Predicate<MetricUpdate<?>>() { @Override @@ -146,6 +158,21 @@ public class SparkMetricResults extends MetricResults { } }; + private static final Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>> + TO_GAUGE_RESULT = + new Function<MetricUpdate<GaugeData>, MetricResult<GaugeResult>>() { + @Override + public MetricResult<GaugeResult> apply(MetricUpdate<GaugeData> metricResult) { + if (metricResult != null) { + MetricKey key = metricResult.getKey(); + return new SparkMetricResult<>(key.metricName(), key.stepName(), + metricResult.getUpdate().extractResult()); + } else { + return null; + } + } + }; + private static class SparkMetricResult<T> implements MetricResult<T> { private final MetricName name; private final String step; http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java index d376ce3..b6aa178 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricsContainer.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.GaugeData; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; @@ -47,6 +48,7 @@ public class SparkMetricsContainer implements Serializable { private final Map<MetricKey, MetricUpdate<Long>> counters = new HashMap<>(); private final Map<MetricKey, MetricUpdate<DistributionData>> distributions = new HashMap<>(); + private final Map<MetricKey, MetricUpdate<GaugeData>> gauges = new HashMap<>(); public MetricsContainer getContainer(String stepName) { if (metricsContainers == null) { @@ -76,9 +78,14 @@ public class SparkMetricsContainer implements Serializable { return sparkMetricsContainer.distributions.values(); } + static Collection<MetricUpdate<GaugeData>> getGauges() { + return getInstance().gauges.values(); + } + SparkMetricsContainer update(SparkMetricsContainer other) { this.updateCounters(other.counters.values()); this.updateDistributions(other.distributions.values()); + this.updateGauges(other.gauges.values()); return this; } @@ -101,6 +108,7 @@ public class SparkMetricsContainer implements Serializable { MetricUpdates cumulative = container.getCumulative(); this.updateCounters(cumulative.counterUpdates()); this.updateDistributions(cumulative.distributionUpdates()); + this.updateGauges(cumulative.gaugeUpdates()); } } } @@ -123,6 +131,18 @@ public class SparkMetricsContainer implements Serializable { } } + private void updateGauges(Iterable<MetricUpdate<GaugeData>> updates) { + for (MetricUpdate<GaugeData> update : updates) { + MetricKey key = update.getKey(); + MetricUpdate<GaugeData> current = gauges.get(key); + gauges.put( + key, + current != null + ? MetricUpdate.create(key, current.getUpdate().combine(update.getUpdate())) + : update); + } + } + private static class MetricsContainerCacheLoader extends CacheLoader<String, MetricsContainer> { @SuppressWarnings("NullableProblems") @Override http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java new file mode 100644 index 0000000..6c03c80 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Gauge.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; + +/** + * A metric that reports the latest value out of reported values. + * + * <p>Since metrics are collected from many workers the value may not be the absolute last, + * but one of the latest values.</p> + */ +@Experimental(Experimental.Kind.METRICS) +public interface Gauge extends Metric { + /** Set current value for this gauge. */ + void set(long value); +} http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java new file mode 100644 index 0000000..35ae822 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeCell.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.annotations.Experimental; + +/** + * Tracks the current value (and delta) for a {@link Gauge} metric. + * + * <p>This class generally shouldn't be used directly. The only exception is within a runner where + * a gauge is being reported for a specific step (rather than the gauge in the current + * context). In that case retrieving the underlying cell and reporting directly to it avoids a step + * of indirection. + */ +@Experimental(Experimental.Kind.METRICS) +public class GaugeCell implements MetricCell<Gauge, GaugeData>, Gauge { + + private final DirtyState dirty = new DirtyState(); + private final AtomicReference<GaugeData> gaugeValue = new AtomicReference<>(GaugeData.empty()); + + @Override + public void set(long value) { + GaugeData original; + do { + original = gaugeValue.get(); + } while (!gaugeValue.compareAndSet(original, original.combine(GaugeData.create(value)))); + dirty.afterModification(); + } + + @Override + public DirtyState getDirty() { + return dirty; + } + + @Override + public GaugeData getCumulative() { + return gaugeValue.get(); + } + + @Override + public Gauge getInterface() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java new file mode 100644 index 0000000..bf3401d --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeData.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.joda.time.Instant; + +/** + * Data describing the gauge. This should retain enough detail that it can be combined with + * other {@link GaugeData}. + */ +@AutoValue +public abstract class GaugeData implements Serializable { + + public abstract long value(); + + public abstract Instant timestamp(); + + public static GaugeData create(long value) { + return new AutoValue_GaugeData(value, Instant.now()); + } + + public static GaugeData empty() { + return EmptyGaugeData.INSTANCE; + } + + public GaugeData combine(GaugeData other) { + if (this.timestamp().isAfter(other.timestamp())) { + return this; + } else { + return other; + } + } + + public GaugeResult extractResult() { + return GaugeResult.create(value(), timestamp()); + } + + /** + * Empty {@link GaugeData}, representing no values reported. + */ + public static class EmptyGaugeData extends GaugeData { + + private static final EmptyGaugeData INSTANCE = new EmptyGaugeData(); + private static final Instant EPOCH = new Instant(0); + + private EmptyGaugeData() { + } + + @Override + public long value() { + return -1L; + } + + @Override + public Instant timestamp() { + return EPOCH; + } + + @Override + public GaugeResult extractResult() { + return GaugeResult.empty(); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java new file mode 100644 index 0000000..878776a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/GaugeResult.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import org.joda.time.Instant; + +/** + * The result of a {@link Gauge} metric. + */ +@AutoValue +public abstract class GaugeResult { + public abstract long value(); + + public abstract Instant timestamp(); + + public static GaugeResult create(long value, Instant timestamp) { + return new AutoValue_GaugeResult(value, timestamp); + } + + public static GaugeResult empty() { + return EmptyGaugeResult.INSTANCE; + } + + /** + * Empty {@link GaugeResult}, representing no values reported. + */ + public static class EmptyGaugeResult extends GaugeResult { + + private static final EmptyGaugeResult INSTANCE = new EmptyGaugeResult(); + private static final Instant EPOCH = new Instant(0); + + private EmptyGaugeResult() { + } + + @Override + public long value() { + return -1L; + } + + @Override + public Instant timestamp() { + return EPOCH; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java index 2241ba8..a7838ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -30,4 +30,7 @@ public interface MetricQueryResults { /** Return the metric results for the distributions that matched the filter. */ Iterable<MetricResult<DistributionResult>> distributions(); + + /** Return the metric results for the gauges that matched the filter. */ + Iterable<MetricResult<GaugeResult>> gauges(); } http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java index 56466d8..9cf6a5c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java @@ -33,7 +33,8 @@ public abstract class MetricUpdates { public static final MetricUpdates EMPTY = MetricUpdates.create( Collections.<MetricUpdate<Long>>emptyList(), - Collections.<MetricUpdate<DistributionData>>emptyList()); + Collections.<MetricUpdate<DistributionData>>emptyList(), + Collections.<MetricUpdate<GaugeData>>emptyList()); /** * Representation of a single metric update. @@ -64,10 +65,14 @@ public abstract class MetricUpdates { /** All of the distribution updates. */ public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates(); + /** All of the gauges updates. */ + public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates(); + /** Create a new {@link MetricUpdates} bundle. */ public static MetricUpdates create( Iterable<MetricUpdate<Long>> counterUpdates, - Iterable<MetricUpdate<DistributionData>> distributionUpdates) { - return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates); + Iterable<MetricUpdate<DistributionData>> distributionUpdates, + Iterable<MetricUpdate<GaugeData>> gaugeUpdates) { + return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, gaugeUpdates); } } http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java index 045e076..121698d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -58,6 +58,22 @@ public class Metrics { return new DelegatingDistribution(MetricName.named(namespace, name)); } + /** + * Create a metric that can have its new value set, and is aggregated by taking the last reported + * value. + */ + public static Gauge gauge(String namespace, String name) { + return new DelegatingGauge(MetricName.named(namespace, name)); + } + + /** + * Create a metric that can have its new value set, and is aggregated by taking the last reported + * value. + */ + public static Gauge gauge(Class<?> namespace, String name) { + return new DelegatingGauge(MetricName.named(namespace, name)); + } + /** Implementation of {@link Counter} that delegates to the instance for the current context. */ private static class DelegatingCounter implements Counter, Serializable { private final MetricName name; @@ -108,4 +124,23 @@ public class Metrics { } } } + + /** + * Implementation of {@link Gauge} that delegates to the instance for the current context. + */ + private static class DelegatingGauge implements Gauge, Serializable { + private final MetricName name; + + private DelegatingGauge(MetricName name) { + this.name = name; + } + + @Override + public void set(long value) { + MetricsContainer container = MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getGauge(name).set(value); + } + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java index ba5a343..5812ec6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -57,6 +57,14 @@ public class MetricsContainer { } }); + private MetricsMap<MetricName, GaugeCell> gauges = + new MetricsMap<>(new MetricsMap.Factory<MetricName, GaugeCell>() { + @Override + public GaugeCell createInstance(MetricName unusedKey) { + return new GaugeCell(); + } + }); + /** * Create a new {@link MetricsContainer} associated with the given {@code stepName}. */ @@ -72,10 +80,22 @@ public class MetricsContainer { return counters.get(metricName); } + /** + * Return the {@link DistributionCell} that should be used for implementing the given + * {@code metricName} in this container. + */ public DistributionCell getDistribution(MetricName metricName) { return distributions.get(metricName); } + /** + * Return the {@link GaugeCell} that should be used for implementing the given + * {@code metricName} in this container. + */ + public GaugeCell getGauge(MetricName metricName) { + return gauges.get(metricName); + } + private <UpdateT, CellT extends MetricCell<?, UpdateT>> ImmutableList<MetricUpdate<UpdateT>> extractUpdates( MetricsMap<MetricName, CellT> cells) { @@ -96,7 +116,8 @@ public class MetricsContainer { public MetricUpdates getUpdates() { return MetricUpdates.create( extractUpdates(counters), - extractUpdates(distributions)); + extractUpdates(distributions), + extractUpdates(gauges)); } private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) { @@ -132,6 +153,7 @@ public class MetricsContainer { public MetricUpdates getCumulative() { return MetricUpdates.create( extractCumulatives(counters), - extractCumulatives(distributions)); + extractCumulatives(distributions), + extractCumulatives(gauges)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java new file mode 100644 index 0000000..d8ef928 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/GaugeCellTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import org.junit.Test; + +/** + * Tests for {@link GaugeCell}. + */ +public class GaugeCellTest { + private GaugeCell cell = new GaugeCell(); + + @Test + public void testDeltaAndCumulative() { + cell.set(5); + cell.set(7); + assertThat(cell.getCumulative().value(), equalTo(GaugeData.create(7).value())); + assertThat("getCumulative is idempotent", + cell.getCumulative().value(), equalTo(7L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + + cell.set(30); + assertThat(cell.getCumulative().value(), equalTo(30L)); + + assertThat("Adding a new value made the cell dirty", + cell.getDirty().beforeCommit(), equalTo(true)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java index 5de8894..2251c82 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -92,7 +92,7 @@ public class MetricMatchers { return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) && item.step().contains(step) - && Objects.equals(attempted, item.attempted()); + && metricResultsEqual(attempted, item.attempted()); } @Override @@ -135,7 +135,7 @@ public class MetricMatchers { return Objects.equals(namespace, item.name().namespace()) && Objects.equals(name, item.name().name()) && item.step().contains(step) - && Objects.equals(committed, item.committed()); + && metricResultsEqual(committed, item.committed()); } @Override @@ -165,6 +165,14 @@ public class MetricMatchers { }; } + private static <T> boolean metricResultsEqual(T result1, T result2) { + if (result1 instanceof GaugeResult) { + return (((GaugeResult) result1).value()) == (((GaugeResult) result2).value()); + } else { + return result1.equals(result2); + } + } + static Matcher<MetricResult<DistributionResult>> distributionAttemptedMinMax( final String namespace, final String name, final String step, final Long attemptedMin, final Long attemptedMax) { http://git-wip-us.apache.org/repos/asf/beam/blob/63e953c6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index fc9e18b..697ff5a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.CoreMatchers; +import org.joda.time.Instant; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -52,6 +53,7 @@ public class MetricsTest implements Serializable { private static final String NS = "test"; private static final String NAME = "name"; private static final MetricName METRIC_NAME = MetricName.named(NS, NAME); + private static final String NAMESPACE = MetricsTest.class.getName(); @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -127,19 +129,22 @@ public class MetricsTest implements Serializable { .build()); assertThat(metrics.counters(), hasItem( - committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L))); + committedMetricsResult(NAMESPACE, "count", "MyStep1", 3L))); assertThat(metrics.distributions(), hasItem( - committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1", + committedMetricsResult(NAMESPACE, "input", "MyStep1", DistributionResult.create(26L, 3L, 5L, 13L)))); assertThat(metrics.counters(), hasItem( - committedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L))); + committedMetricsResult(NAMESPACE, "count", "MyStep2", 6L))); assertThat(metrics.distributions(), hasItem( - committedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2", + committedMetricsResult(NAMESPACE, "input", "MyStep2", DistributionResult.create(52L, 6L, 5L, 13L)))); + assertThat(metrics.gauges(), hasItem( + committedMetricsResult(NAMESPACE, "my-gauge", "MyStep2", + GaugeResult.create(12L, Instant.now())))); assertThat(metrics.distributions(), hasItem( - distributionCommittedMinMax(MetricsTest.class.getName(), "bundle", "MyStep1", 10L, 40L))); + distributionCommittedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L))); } @@ -154,19 +159,22 @@ public class MetricsTest implements Serializable { // TODO: BEAM-1169: Metrics shouldn't verify the physical values tightly. assertThat(metrics.counters(), hasItem( - attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep1", 3L))); + attemptedMetricsResult(NAMESPACE, "count", "MyStep1", 3L))); assertThat(metrics.distributions(), hasItem( - attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep1", + attemptedMetricsResult(NAMESPACE, "input", "MyStep1", DistributionResult.create(26L, 3L, 5L, 13L)))); assertThat(metrics.counters(), hasItem( - attemptedMetricsResult(MetricsTest.class.getName(), "count", "MyStep2", 6L))); + attemptedMetricsResult(NAMESPACE, "count", "MyStep2", 6L))); assertThat(metrics.distributions(), hasItem( - attemptedMetricsResult(MetricsTest.class.getName(), "input", "MyStep2", + attemptedMetricsResult(NAMESPACE, "input", "MyStep2", DistributionResult.create(52L, 6L, 5L, 13L)))); + assertThat(metrics.gauges(), hasItem( + attemptedMetricsResult(NAMESPACE, "my-gauge", "MyStep2", + GaugeResult.create(12L, Instant.now())))); assertThat(metrics.distributions(), hasItem( - distributionAttemptedMinMax(MetricsTest.class.getName(), "bundle", "MyStep1", 10L, 40L))); + distributionAttemptedMinMax(NAMESPACE, "bundle", "MyStep1", 10L, 40L))); } private PipelineResult runPipelineWithMetrics() { @@ -205,10 +213,13 @@ public class MetricsTest implements Serializable { @ProcessElement public void processElement(ProcessContext c) { Distribution values = Metrics.distribution(MetricsTest.class, "input"); + Gauge gauge = Metrics.gauge(MetricsTest.class, "my-gauge"); + Integer element = c.element(); count.inc(); - values.update(c.element()); - c.output(c.element()); - c.sideOutput(output2, c.element()); + values.update(element); + gauge.set(12L); + c.output(element); + c.sideOutput(output2, element); } })); PipelineResult result = pipeline.run();