Repository: incubator-beam Updated Branches: refs/heads/master e969f3d38 -> 3c731707b
Implement Metrics in the DirectRunner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/834933c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/834933c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/834933c5 Branch: refs/heads/master Commit: 834933c520997b4f83cf8b04219c2c63dac61e61 Parents: 51fee39 Author: bchambers <bchamb...@google.com> Authored: Wed Oct 12 10:55:53 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Thu Oct 13 15:29:29 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/DirectMetrics.java | 331 +++++++++++++++++++ .../beam/runners/direct/DirectRunner.java | 8 +- .../beam/runners/direct/EvaluationContext.java | 10 + .../direct/ExecutorServiceParallelExecutor.java | 1 + .../direct/ImmutableListBundleFactory.java | 10 + .../runners/direct/StepTransformResult.java | 49 ++- .../beam/runners/direct/TransformExecutor.java | 35 +- .../beam/runners/direct/TransformResult.java | 12 + .../beam/runners/direct/DirectMetricsTest.java | 133 ++++++++ .../beam/runners/direct/DirectRunnerTest.java | 36 ++ .../runners/direct/TransformExecutorTest.java | 12 + 11 files changed, 602 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java new file mode 100644 index 0000000..a749a76 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static java.util.Arrays.asList; + +import com.google.auto.value.AutoValue; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.metrics.MetricsMap; + +/** + * Implementation of {@link MetricResults} for the Direct Runner. + */ +class DirectMetrics extends MetricResults { + + // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner. + private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool(); + + private interface MetricAggregation<UpdateT, ResultT> { + UpdateT zero(); + UpdateT combine(Iterable<UpdateT> updates); + ResultT extract(UpdateT data); + } + + /** + * Implementation of a metric in the direct runner. + * + * @param <UpdateT> The type of raw data received and aggregated across updates. + * @param <ResultT> The type of result extracted from the data. + */ + private static class DirectMetric<UpdateT, ResultT> { + private final MetricAggregation<UpdateT, ResultT> aggregation; + + private final AtomicReference<UpdateT> finishedCommitted; + + private final Object attemptedLock = new Object(); + @GuardedBy("attemptedLock") + private volatile UpdateT finishedAttempted; + @GuardedBy("attemptedLock") + private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted = + new ConcurrentHashMap<>(); + + public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) { + this.aggregation = aggregation; + finishedCommitted = new AtomicReference<>(aggregation.zero()); + finishedAttempted = aggregation.zero(); + } + + /** + * Add the given {@code tentativeCumulative} update to the physical aggregate. + * + * @param bundle The bundle receiving an update. + * @param tentativeCumulative The new cumulative value for the given bundle. + */ + public void updatePhysical(CommittedBundle<?> bundle, UpdateT tentativeCumulative) { + // Add (or update) the cumulatiev value for the given bundle. + inflightAttempted.put(bundle, tentativeCumulative); + } + + /** + * Commit a physical value for the given {@code bundle}. + * + * @param bundle The bundle being committed. + * @param finalCumulative The final cumulative value for the given bundle. + */ + public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) { + // To prevent a query from blocking the commit, we perform the commit in two steps. + // 1. We perform a non-blocking write to the uncommitted table to make the new vaule + // available immediately. + // 2. We submit a runnable that will commit the update and remove the tentative value in + // a synchronized block. + inflightAttempted.put(bundle, finalCumulative); + COUNTER_COMMITTER.submit(new Runnable() { + @Override + public void run() { + synchronized (attemptedLock) { + finishedAttempted = aggregation.combine(asList(finishedAttempted, finalCumulative)); + inflightAttempted.remove(bundle); + } + } + }); + } + + /** Extract the latest values from all attempted and in-progress bundles. */ + public ResultT extractLatestAttempted() { + ArrayList<UpdateT> updates = new ArrayList<>(inflightAttempted.size() + 1); + // Within this block we know that will be consistent. Specifically, the only change that can + // happen concurrently is the addition of new (larger) values to inflightAttempted. + synchronized (attemptedLock) { + updates.add(finishedAttempted); + updates.addAll(inflightAttempted.values()); + } + return aggregation.extract(aggregation.combine(updates)); + } + + /** + * Commit a logical value for the given {@code bundle}. + * + * @param bundle The bundle being committed. + * @param finalCumulative The final cumulative value for the given bundle. + */ + public void commitLogical(final CommittedBundle<?> bundle, final UpdateT finalCumulative) { + UpdateT current; + do { + current = finishedCommitted.get(); + } while (!finishedCommitted.compareAndSet(current, + aggregation.combine(asList(current, finalCumulative)))); + } + + /** Extract the value from all successfully committed bundles. */ + public ResultT extractCommitted() { + return aggregation.extract(finishedCommitted.get()); + } + } + + private static final MetricAggregation<Long, Long> COUNTER = + new MetricAggregation<Long, Long>() { + @Override + public Long zero() { + return 0L; + } + + @Override + public Long combine(Iterable<Long> updates) { + long value = 0; + for (long update : updates) { + value += update; + } + return value; + } + + @Override + public Long extract(Long data) { + return data; + } + }; + + private static final MetricAggregation<DistributionData, DistributionResult> DISTRIBUTION = + new MetricAggregation<DistributionData, DistributionResult>() { + @Override + public DistributionData zero() { + return DistributionData.EMPTY; + } + + @Override + public DistributionData combine(Iterable<DistributionData> updates) { + DistributionData result = DistributionData.EMPTY; + for (DistributionData update : updates) { + result = result.combine(update); + } + return result; + } + + @Override + public DistributionResult extract(DistributionData data) { + return data.extractResult(); + } + }; + + /** The current values of counters in memory. */ + private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters = + new MetricsMap<>(new MetricsMap.Factory<MetricKey, DirectMetric<Long, Long>>() { + @Override + public DirectMetric<Long, Long> createInstance(MetricKey unusedKey) { + return new DirectMetric<>(COUNTER); + } + }); + private MetricsMap<MetricKey, DirectMetric<DistributionData, DistributionResult>> distributions = + new MetricsMap<>( + new MetricsMap.Factory<MetricKey, DirectMetric<DistributionData, DistributionResult>>() { + @Override + public DirectMetric<DistributionData, DistributionResult> createInstance( + MetricKey unusedKey) { + return new DirectMetric<>(DISTRIBUTION); + } + }); + + @AutoValue + abstract static class DirectMetricQueryResults implements MetricQueryResults { + public static MetricQueryResults create( + Iterable<MetricResult<Long>> counters, + Iterable<MetricResult<DistributionResult>> distributions) { + return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, distributions); + } + } + + @AutoValue + abstract static class DirectMetricResult<T> implements MetricResult<T> { + public static <T> MetricResult<T> create(MetricName name, String scope, + T committed, T attempted) { + return new AutoValue_DirectMetrics_DirectMetricResult<T>( + name, scope, committed, attempted); + } + } + + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { + ImmutableList.Builder<MetricResult<Long>> counterResults = ImmutableList.builder(); + for (Entry<MetricKey, DirectMetric<Long, Long>> counter : counters.entries()) { + maybeExtractResult(filter, counterResults, counter); + } + ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults = + ImmutableList.builder(); + for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> distribution + : distributions.entries()) { + maybeExtractResult(filter, distributionResults, distribution); + } + + return DirectMetricQueryResults.create(counterResults.build(), distributionResults.build()); + } + + private <ResultT> void maybeExtractResult( + MetricsFilter filter, + ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder, + Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) { + if (matches(filter, entry.getKey())) { + resultsBuilder.add(DirectMetricResult.create( + entry.getKey().metricName(), + entry.getKey().stepName(), + entry.getValue().extractCommitted(), + entry.getValue().extractLatestAttempted())); + } + } + + // Matching logic is implemented here rather than in MetricsFilter because we would like + // MetricsFilter to act as a "dumb" value-object, with the possibility of replacing it with + // a Proto/JSON/etc. schema object. + private boolean matches(MetricsFilter filter, MetricKey key) { + return matchesName(key.metricName(), filter.names()) + && matchesScope(key.stepName(), filter.steps()); + } + + private boolean matchesScope(String actualScope, Set<String> scopes) { + if (scopes.isEmpty() || scopes.contains(actualScope)) { + return true; + } + + for (String scope : scopes) { + if (actualScope.startsWith(scope)) { + return true; + } + } + + return false; + } + + private boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) { + if (nameFilters.isEmpty()) { + return true; + } + + for (MetricNameFilter nameFilter : nameFilters) { + if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) + && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { + return true; + } + } + + return false; + } + + /** Apply metric updates that represent physical counter deltas to the current metric values. */ + public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) { + for (MetricUpdate<Long> counter : updates.counterUpdates()) { + counters.get(counter.getKey()).updatePhysical(bundle, counter.getUpdate()); + } + for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) { + distributions.get(distribution.getKey()) + .updatePhysical(bundle, distribution.getUpdate()); + } + } + + public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) { + for (MetricUpdate<Long> counter : updates.counterUpdates()) { + counters.get(counter.getKey()).commitPhysical(bundle, counter.getUpdate()); + } + for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) { + distributions.get(distribution.getKey()) + .commitPhysical(bundle, distribution.getUpdate()); + } + } + + /** Apply metric updates that represent new logical values from a bundle being committed. */ + public void commitLogical(CommittedBundle<?> bundle, MetricUpdates updates) { + for (MetricUpdate<Long> counter : updates.counterUpdates()) { + counters.get(counter.getKey()).commitLogical(bundle, counter.getUpdate()); + } + for (MetricUpdate<DistributionData> distribution : updates.distributionUpdates()) { + distributions.get(distribution.getKey()) + .commitLogical(bundle, distribution.getUpdate()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index e13046d..8941093 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -43,6 +43,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.Write; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.testing.TestStream; @@ -226,6 +227,7 @@ public class DirectRunner @Override public DirectPipelineResult run(Pipeline pipeline) { + MetricsEnvironment.setMetricsSupported(true); ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor(); pipeline.traverseTopologically(consumerTrackingVisitor); for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) { @@ -268,8 +270,7 @@ public class DirectRunner Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = pipeline.getAggregatorSteps(); - DirectPipelineResult result = - new DirectPipelineResult(executor, context, aggregatorSteps); + DirectPipelineResult result = new DirectPipelineResult(executor, context, aggregatorSteps); if (options.isBlockOnRun()) { try { result.awaitCompletion(); @@ -383,8 +384,7 @@ public class DirectRunner @Override public MetricResults metrics() { - throw new UnsupportedOperationException( - "The DirectRunner does not currently support metrics."); + return evaluationContext.getMetrics(); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 2901254..e5a30d4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -96,6 +96,8 @@ class EvaluationContext { private final AggregatorContainer mergedAggregators; + private final DirectMetrics metrics; + public static EvaluationContext create( DirectOptions options, Clock clock, @@ -130,6 +132,7 @@ class EvaluationContext { this.applicationStateInternals = new ConcurrentHashMap<>(); this.mergedAggregators = AggregatorContainer.create(); + this.metrics = new DirectMetrics(); this.callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor()); @@ -161,6 +164,8 @@ class EvaluationContext { TransformResult result) { Iterable<? extends CommittedBundle<?>> committedBundles = commitBundles(result.getOutputBundles()); + metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates()); + // Update watermarks and timers EnumSet<OutputType> outputTypes = EnumSet.copyOf(result.getOutputTypes()); if (Iterables.isEmpty(committedBundles)) { @@ -367,6 +372,11 @@ class EvaluationContext { return mergedAggregators; } + /** Returns the metrics container for this pipeline. */ + public DirectMetrics getMetrics() { + return metrics; + } + @VisibleForTesting void forceRefresh() { watermarkManager.refreshAll(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index fab6a33..3761574 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -212,6 +212,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { TransformExecutor<T> callable = TransformExecutor.create( + evaluationContext, registry, enforcements, bundle, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java index 4972340..db92542 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java @@ -123,5 +123,15 @@ class ImmutableListBundleFactory implements BundleFactory { ImmutableList.copyOf(elements), getSynchronizedProcessingOutputWatermark()); } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return this == obj; + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java index 1829e4a..989109f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java @@ -22,10 +22,10 @@ import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.EnumSet; import java.util.Set; -import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; +import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; @@ -37,31 +37,6 @@ import org.joda.time.Instant; */ @AutoValue public abstract class StepTransformResult implements TransformResult { - @Override - public abstract AppliedPTransform<?, ?, ?> getTransform(); - - @Override - public abstract Iterable<? extends UncommittedBundle<?>> getOutputBundles(); - - @Override - public abstract Iterable<? extends WindowedValue<?>> getUnprocessedElements(); - - @Override - @Nullable - public abstract AggregatorContainer.Mutator getAggregatorChanges(); - - @Override - public abstract Instant getWatermarkHold(); - - @Nullable - @Override - public abstract CopyOnAccessInMemoryStateInternals<?> getState(); - - @Override - public abstract TimerUpdate getTimerUpdate(); - - @Override - public abstract Set<OutputType> getOutputTypes(); public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant watermarkHold) { return new Builder(transform, watermarkHold); @@ -71,6 +46,20 @@ public abstract class StepTransformResult implements TransformResult { return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE); } + @Override + public TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates) { + return new AutoValue_StepTransformResult( + getTransform(), + getOutputBundles(), + getUnprocessedElements(), + getAggregatorChanges(), + metricUpdates, + getWatermarkHold(), + getState(), + getTimerUpdate(), + getOutputTypes()); + } + /** * A builder for creating instances of {@link StepTransformResult}. */ @@ -78,6 +67,7 @@ public abstract class StepTransformResult implements TransformResult { private final AppliedPTransform<?, ?, ?> transform; private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder; private final ImmutableList.Builder<WindowedValue<?>> unprocessedElementsBuilder; + private MetricUpdates metricUpdates; private CopyOnAccessInMemoryStateInternals<?> state; private TimerUpdate timerUpdate; private AggregatorContainer.Mutator aggregatorChanges; @@ -91,6 +81,7 @@ public abstract class StepTransformResult implements TransformResult { this.producedOutputs = EnumSet.noneOf(OutputType.class); this.unprocessedElementsBuilder = ImmutableList.builder(); this.timerUpdate = TimerUpdate.builder(null).build(); + this.metricUpdates = MetricUpdates.EMPTY; } public StepTransformResult build() { @@ -99,6 +90,7 @@ public abstract class StepTransformResult implements TransformResult { bundlesBuilder.build(), unprocessedElementsBuilder.build(), aggregatorChanges, + metricUpdates, watermarkHold, state, timerUpdate, @@ -110,6 +102,11 @@ public abstract class StepTransformResult implements TransformResult { return this; } + public Builder withMetricUpdates(MetricUpdates metricUpdates) { + this.metricUpdates = metricUpdates; + return this; + } + public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) { this.state = state; return this; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java index aaee9a5..03f615b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java @@ -25,6 +25,9 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue; @@ -38,6 +41,7 @@ import org.apache.beam.sdk.util.WindowedValue; */ class TransformExecutor<T> implements Runnable { public static <T> TransformExecutor<T> create( + EvaluationContext context, TransformEvaluatorFactory factory, Iterable<? extends ModelEnforcementFactory> modelEnforcements, CommittedBundle<T> inputBundle, @@ -45,6 +49,7 @@ class TransformExecutor<T> implements Runnable { CompletionCallback completionCallback, TransformExecutorService transformEvaluationState) { return new TransformExecutor<>( + context, factory, modelEnforcements, inputBundle, @@ -63,10 +68,12 @@ class TransformExecutor<T> implements Runnable { private final CompletionCallback onComplete; private final TransformExecutorService transformEvaluationState; + private final EvaluationContext context; private final AtomicReference<Thread> thread; private TransformExecutor( + EvaluationContext context, TransformEvaluatorFactory factory, Iterable<? extends ModelEnforcementFactory> modelEnforcements, CommittedBundle<T> inputBundle, @@ -82,11 +89,14 @@ class TransformExecutor<T> implements Runnable { this.onComplete = completionCallback; this.transformEvaluationState = transformEvaluationState; + this.context = context; this.thread = new AtomicReference<>(); } @Override public void run() { + MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName()); + MetricsEnvironment.setMetricsContainer(metricsContainer); checkState( thread.compareAndSet(null, Thread.currentThread()), "Tried to execute %s for %s on thread %s, but is already executing on thread %s", @@ -108,9 +118,9 @@ class TransformExecutor<T> implements Runnable { return; } - processElements(evaluator, enforcements); + processElements(evaluator, metricsContainer, enforcements); - finishBundle(evaluator, enforcements); + finishBundle(evaluator, metricsContainer, enforcements); } catch (Throwable t) { onComplete.handleThrowable(inputBundle, t); if (t instanceof RuntimeException) { @@ -118,6 +128,10 @@ class TransformExecutor<T> implements Runnable { } throw new RuntimeException(t); } finally { + // Report the physical metrics from the end of this step. + context.getMetrics().commitPhysical(inputBundle, metricsContainer.getCumulative()); + + MetricsEnvironment.unsetMetricsContainer(); transformEvaluationState.complete(this); } } @@ -127,7 +141,9 @@ class TransformExecutor<T> implements Runnable { * necessary {@link ModelEnforcement ModelEnforcements}. */ private void processElements( - TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements) + TransformEvaluator<T> evaluator, + MetricsContainer metricsContainer, + Collection<ModelEnforcement<T>> enforcements) throws Exception { if (inputBundle != null) { for (WindowedValue<T> value : inputBundle.getElements()) { @@ -137,6 +153,13 @@ class TransformExecutor<T> implements Runnable { evaluator.processElement(value); + // Report the physical metrics after each element + MetricUpdates deltas = metricsContainer.getUpdates(); + if (deltas != null) { + context.getMetrics().updatePhysical(inputBundle, deltas); + metricsContainer.commitUpdates(); + } + for (ModelEnforcement<T> enforcement : enforcements) { enforcement.afterElement(value); } @@ -152,9 +175,11 @@ class TransformExecutor<T> implements Runnable { * {@link TransformEvaluator#finishBundle()} */ private TransformResult finishBundle( - TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements) + TransformEvaluator<T> evaluator, MetricsContainer metricsContainer, + Collection<ModelEnforcement<T>> enforcements) throws Exception { - TransformResult result = evaluator.finishBundle(); + TransformResult result = evaluator.finishBundle() + .withLogicalMetricUpdates(metricsContainer.getCumulative()); CommittedResult outputs = onComplete.handleResult(inputBundle, result); for (ModelEnforcement<T> enforcement : enforcements) { enforcement.afterFinish(inputBundle, result, outputs.getOutputs()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java index ba2d48e..ac1e395 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java @@ -22,6 +22,7 @@ import javax.annotation.Nullable; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; +import org.apache.beam.sdk.metrics.MetricUpdates; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -57,6 +58,11 @@ public interface TransformResult { @Nullable AggregatorContainer.Mutator getAggregatorChanges(); /** + * Returns the logical metric updates. + */ + MetricUpdates getLogicalMetricUpdates(); + + /** * Returns the Watermark Hold for the transform at the time this result was produced. * * <p>If the transform does not set any watermark hold, returns @@ -86,4 +92,10 @@ public interface TransformResult { * {@link OutputType#BUNDLE}, as empty bundles may be dropped when the transform is committed. */ Set<OutputType> getOutputTypes(); + + /** + * Returns a new TransformResult based on this one but overwriting any existing logical metric + * updates with {@code metricUpdates}. + */ + TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java new file mode 100644 index 0000000..df01244 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.direct; + +import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; +import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link DirectMetrics}. + */ +@RunWith(JUnit4.class) +public class DirectMetricsTest { + + @Mock + private CommittedBundle<Object> bundle1; + @Mock + private CommittedBundle<Object> bundle2; + + private static final MetricName NAME1 = MetricName.named("ns1", "name1"); + private static final MetricName NAME2 = MetricName.named("ns1", "name2"); + private static final MetricName NAME3 = MetricName.named("ns2", "name1"); + + private DirectMetrics metrics = new DirectMetrics(); + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testApplyLogicalQueryNoFilter() { + metrics.commitLogical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)), + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), + DistributionData.create(8, 2, 3, 5))))); + metrics.commitLogical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step2", NAME1), 7L), + MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)), + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), + DistributionData.create(4, 1, 4, 4))))); + + MetricQueryResults results = metrics.queryMetrics(MetricsFilter.builder().build()); + assertThat(results.counters(), containsInAnyOrder( + metricResult("ns1", "name1", "step1", 5L, 0L), + metricResult("ns1", "name2", "step1", 12L, 0L), + metricResult("ns1", "name1", "step2", 7L, 0L))); + assertThat(results.distributions(), contains( + metricResult("ns1", "name1", "step1", + DistributionResult.create(12, 3, 3, 5), + DistributionResult.ZERO))); + } + + @Test + public void testApplyPhysicalCountersQueryOneNamespace() { + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)), + ImmutableList.<MetricUpdate<DistributionData>>of())); + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("step2", NAME1), 7L), + MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)), + ImmutableList.<MetricUpdate<DistributionData>>of())); + + assertThat(metrics.queryMetrics( + MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()).counters(), + containsInAnyOrder( + metricResult("ns1", "name1", "step1", 0L, 5L), + metricResult("ns1", "name1", "step2", 0L, 7L))); + } + + @Test + public void testApplyPhysicalQueryCompositeScope() { + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)), + ImmutableList.<MetricUpdate<DistributionData>>of())); + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L), + MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)), + ImmutableList.<MetricUpdate<DistributionData>>of())); + + assertThat(metrics.queryMetrics( + MetricsFilter.builder().addStep("Outer1").build()).counters(), + containsInAnyOrder( + metricResult("ns1", "name1", "Outer1/Inner1", 0L, 12L), + metricResult("ns1", "name1", "Outer1/Inner2", 0L, 8L))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 4768fb0..d93dd7a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isA; @@ -35,12 +37,20 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.PipelineRunner; @@ -444,4 +454,30 @@ public class DirectRunnerTest implements Serializable { throw new CoderException("Cannot decode a long"); } } + + public void testMetrics() throws Exception { + Pipeline pipeline = getPipeline(); + pipeline + .apply(Create.of(5, 8, 13)) + .apply("MyStep", ParDo.of(new DoFn<Integer, Void>() { + @ProcessElement + public void processElement(ProcessContext c) { + Counter count = Metrics.counter(DirectRunnerTest.class, "count"); + Distribution values = Metrics.distribution(DirectRunnerTest.class, "input"); + + count.inc(); + values.update(c.element()); + } + })); + PipelineResult result = pipeline.run(); + MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.inNamespace(DirectRunnerTest.class)) + .build()); + assertThat(metrics.counters(), contains( + metricResult(DirectRunnerTest.class.getName(), "count", "MyStep", 3L, 3L))); + assertThat(metrics.distributions(), contains( + metricResult(DirectRunnerTest.class.getName(), "input", "MyStep", + DistributionResult.create(26L, 3L, 5L, 13L), + DistributionResult.create(26L, 3L, 5L, 13L)))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index c63e9bd..5015e5a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -72,6 +72,7 @@ public class TransformExecutorTest { private RegisteringCompletionCallback completionCallback; private TransformExecutorService transformEvaluationState; private BundleFactory bundleFactory; + @Mock private DirectMetrics metrics; @Mock private EvaluationContext evaluationContext; @Mock private TransformEvaluatorRegistry registry; @@ -90,6 +91,8 @@ public class TransformExecutorTest { TestPipeline p = TestPipeline.create(); created = p.apply(Create.of("foo", "spam", "third")); downstream = created.apply(WithKeys.<Integer, String>of(3)); + + when(evaluationContext.getMetrics()).thenReturn(metrics); } @Test @@ -116,6 +119,7 @@ public class TransformExecutorTest { TransformExecutor<Object> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>emptyList(), null, @@ -135,6 +139,7 @@ public class TransformExecutorTest { TransformExecutor<Object> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>emptyList(), null, @@ -177,6 +182,7 @@ public class TransformExecutorTest { TransformExecutor<String> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>emptyList(), inputBundle, @@ -219,6 +225,7 @@ public class TransformExecutorTest { TransformExecutor<String> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>emptyList(), inputBundle, @@ -254,6 +261,7 @@ public class TransformExecutorTest { TransformExecutor<String> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>emptyList(), inputBundle, @@ -294,6 +302,7 @@ public class TransformExecutorTest { TransformExecutor<String> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>emptyList(), null, @@ -335,6 +344,7 @@ public class TransformExecutorTest { TestEnforcementFactory enforcement = new TestEnforcementFactory(); TransformExecutor<String> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>singleton(enforcement), inputBundle, @@ -392,6 +402,7 @@ public class TransformExecutorTest { TransformExecutor<byte[]> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), inputBundle, @@ -448,6 +459,7 @@ public class TransformExecutorTest { TransformExecutor<byte[]> executor = TransformExecutor.create( + evaluationContext, registry, Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), inputBundle,