Repository: beam Updated Branches: refs/heads/master db0ec9991 -> 019d3002b
[BEAM-1672] Use Accumulable MetricsContainers in Flink runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8c2da9ad Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8c2da9ad Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8c2da9ad Branch: refs/heads/master Commit: 8c2da9ad1b8c195757f97feccdbcabcad735c407 Parents: 009cd6e Author: Aviem Zur <aviem...@gmail.com> Authored: Fri May 5 23:14:01 2017 +0300 Committer: Aviem Zur <aviem...@gmail.com> Committed: Sat May 6 08:27:49 2017 +0300 ---------------------------------------------------------------------- .../beam/runners/flink/FlinkRunnerResult.java | 8 +- .../metrics/DoFnRunnerWithMetricsUpdate.java | 12 +- .../flink/metrics/FlinkMetricContainer.java | 273 ++++++------------- .../flink/metrics/FlinkMetricResults.java | 146 ---------- .../flink/metrics/MetricsAccumulator.java | 60 ++++ .../flink/metrics/ReaderInvocationUtil.java | 7 +- .../translation/wrappers/SourceInputFormat.java | 8 +- .../streaming/io/BoundedSourceWrapper.java | 8 +- .../streaming/io/UnboundedSourceWrapper.java | 9 +- 9 files changed, 174 insertions(+), 357 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 90dc79b..038895a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -17,12 +17,15 @@ */ package org.apache.beam.runners.flink; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import java.io.IOException; import java.util.Collections; import java.util.Map; -import org.apache.beam.runners.flink.metrics.FlinkMetricResults; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; import org.joda.time.Duration; /** @@ -72,6 +75,7 @@ public class FlinkRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - return new FlinkMetricResults(accumulators); + return asAttemptedOnlyMetricResults( + (MetricsContainerStepMap) accumulators.get(FlinkMetricContainer.ACCUMULATOR_NAME)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java index dae91fe..40191d2 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -34,6 +34,7 @@ import org.joda.time.Instant; */ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { + private final String stepName; private final FlinkMetricContainer container; private final DoFnRunner<InputT, OutputT> delegate; @@ -41,14 +42,15 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< String stepName, DoFnRunner<InputT, OutputT> delegate, RuntimeContext runtimeContext) { + this.stepName = stepName; this.delegate = delegate; - container = new FlinkMetricContainer(stepName, runtimeContext); + container = new FlinkMetricContainer(runtimeContext); } @Override public void startBundle() { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.startBundle(); } catch (IOException e) { throw new RuntimeException(e); @@ -58,7 +60,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< @Override public void processElement(final WindowedValue<InputT> elem) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.processElement(elem); } catch (IOException e) { throw new RuntimeException(e); @@ -69,7 +71,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp, final TimeDomain timeDomain) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.onTimer(timerId, window, timestamp, timeDomain); } catch (IOException e) { throw new RuntimeException(e); @@ -79,7 +81,7 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> implements DoFnRunner< @Override public void finishBundle() { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { delegate.finishBundle(); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index d020f69..f81205e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -17,19 +17,24 @@ */ package org.apache.beam.runners.flink.metrics; +import static org.apache.beam.sdk.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; + import java.util.HashMap; import java.util.Map; -import org.apache.beam.sdk.metrics.DistributionData; -import org.apache.beam.sdk.metrics.GaugeData; -import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Helper class for holding a {@link MetricsContainer} and forwarding Beam metrics to @@ -37,46 +42,61 @@ import org.apache.flink.metrics.Gauge; */ public class FlinkMetricContainer { + public static final String ACCUMULATOR_NAME = "__metricscontainers"; + + private static final Logger LOG = LoggerFactory.getLogger(FlinkMetricContainer.class); + private static final String METRIC_KEY_SEPARATOR = "__"; - static final String COUNTER_PREFIX = "__counter"; - static final String DISTRIBUTION_PREFIX = "__distribution"; - static final String GAUGE_PREFIX = "__gauge"; + private static final String COUNTER_PREFIX = "__counter"; + private static final String DISTRIBUTION_PREFIX = "__distribution"; + private static final String GAUGE_PREFIX = "__gauge"; - private final MetricsContainer metricsContainer; private final RuntimeContext runtimeContext; private final Map<String, Counter> flinkCounterCache; private final Map<String, FlinkDistributionGauge> flinkDistributionGaugeCache; private final Map<String, FlinkGauge> flinkGaugeCache; + private final MetricsAccumulator metricsAccumulator; - public FlinkMetricContainer(String stepName, RuntimeContext runtimeContext) { - metricsContainer = new MetricsContainer(stepName); + public FlinkMetricContainer(RuntimeContext runtimeContext) { this.runtimeContext = runtimeContext; - flinkCounterCache = new HashMap<>(); - flinkDistributionGaugeCache = new HashMap<>(); - flinkGaugeCache = new HashMap<>(); + this.flinkCounterCache = new HashMap<>(); + this.flinkDistributionGaugeCache = new HashMap<>(); + this.flinkGaugeCache = new HashMap<>(); + + Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> metricsAccumulator = + runtimeContext.getAccumulator(ACCUMULATOR_NAME); + if (metricsAccumulator == null) { + metricsAccumulator = new MetricsAccumulator(); + try { + runtimeContext.addAccumulator(ACCUMULATOR_NAME, metricsAccumulator); + } catch (Exception e) { + LOG.error("Failed to create metrics accumulator.", e); + } + } + this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator; } - public MetricsContainer getMetricsContainer() { - return metricsContainer; + MetricsContainer getMetricsContainer(String stepName) { + return metricsAccumulator != null + ? metricsAccumulator.getLocalValue().getContainer(stepName) + : null; } - public void updateMetrics() { - // update metrics - MetricUpdates updates = metricsContainer.getUpdates(); - if (updates != null) { - updateCounters(updates.counterUpdates()); - updateDistributions(updates.distributionUpdates()); - updateGauge(updates.gaugeUpdates()); - metricsContainer.commitUpdates(); - } + void updateMetrics() { + MetricResults metricResults = + asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue()); + MetricQueryResults metricQueryResults = + metricResults.queryMetrics(MetricsFilter.builder().build()); + updateCounters(metricQueryResults.counters()); + updateDistributions(metricQueryResults.distributions()); + updateGauge(metricQueryResults.gauges()); } - private void updateCounters(Iterable<MetricUpdates.MetricUpdate<Long>> updates) { - - for (MetricUpdates.MetricUpdate<Long> metricUpdate : updates) { + private void updateCounters(Iterable<MetricResult<Long>> counters) { + for (MetricResult<Long> metricResult : counters) { + String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricResult); - String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey()); - Long update = metricUpdate.getUpdate(); + Long update = metricResult.attempted(); // update flink metric Counter counter = flinkCounterCache.get(flinkMetricName); @@ -86,26 +106,15 @@ public class FlinkMetricContainer { } counter.dec(counter.getCount()); counter.inc(update); - - // update flink accumulator - Accumulator<Long, Long> accumulator = runtimeContext.getAccumulator(flinkMetricName); - if (accumulator == null) { - accumulator = new LongCounter(update); - runtimeContext.addAccumulator(flinkMetricName, accumulator); - } else { - accumulator.resetLocal(); - accumulator.add(update); - } } } - private void updateDistributions(Iterable<MetricUpdates.MetricUpdate<DistributionData>> updates) { - - for (MetricUpdates.MetricUpdate<DistributionData> metricUpdate : updates) { - + private void updateDistributions(Iterable<MetricResult<DistributionResult>> distributions) { + for (MetricResult<DistributionResult> metricResult : distributions) { String flinkMetricName = - getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricUpdate.getKey()); - DistributionData update = metricUpdate.getUpdate(); + getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricResult); + + DistributionResult update = metricResult.attempted(); // update flink metric FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName); @@ -116,26 +125,15 @@ public class FlinkMetricContainer { } else { gauge.update(update); } - - // update flink accumulator - Accumulator<DistributionData, DistributionData> accumulator = - runtimeContext.getAccumulator(flinkMetricName); - if (accumulator == null) { - accumulator = new FlinkDistributionDataAccumulator(update); - runtimeContext.addAccumulator(flinkMetricName, accumulator); - } else { - accumulator.resetLocal(); - accumulator.add(update); - } } } - private void updateGauge(Iterable<MetricUpdates.MetricUpdate<GaugeData>> updates) { - for (MetricUpdates.MetricUpdate<GaugeData> metricUpdate : updates) { - + private void updateGauge(Iterable<MetricResult<GaugeResult>> gauges) { + for (MetricResult<GaugeResult> metricResult : gauges) { String flinkMetricName = - getFlinkMetricNameString(GAUGE_PREFIX, metricUpdate.getKey()); - GaugeData update = metricUpdate.getUpdate(); + getFlinkMetricNameString(GAUGE_PREFIX, metricResult); + + GaugeResult update = metricResult.attempted(); // update flink metric FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName); @@ -146,170 +144,55 @@ public class FlinkMetricContainer { } else { gauge.update(update); } - - // update flink accumulator - Accumulator<GaugeData, GaugeData> accumulator = - runtimeContext.getAccumulator(flinkMetricName); - if (accumulator == null) { - accumulator = new FlinkGaugeAccumulator(update); - runtimeContext.addAccumulator(flinkMetricName, accumulator); - } - accumulator.resetLocal(); - accumulator.add(update); } } - private static String getFlinkMetricNameString(String prefix, MetricKey key) { + private static String getFlinkMetricNameString(String prefix, MetricResult<?> metricResult) { return prefix - + METRIC_KEY_SEPARATOR + key.stepName() - + METRIC_KEY_SEPARATOR + key.metricName().namespace() - + METRIC_KEY_SEPARATOR + key.metricName().name(); - } - - static MetricKey parseMetricKey(String flinkMetricName) { - String[] arr = flinkMetricName.split(METRIC_KEY_SEPARATOR); - return MetricKey.create(arr[2], MetricName.named(arr[3], arr[4])); + + METRIC_KEY_SEPARATOR + metricResult.step() + + METRIC_KEY_SEPARATOR + metricResult.name().namespace() + + METRIC_KEY_SEPARATOR + metricResult.name().name(); } /** - * Flink {@link Gauge} for {@link DistributionData}. + * Flink {@link Gauge} for {@link DistributionResult}. */ - public static class FlinkDistributionGauge implements Gauge<DistributionData> { + public static class FlinkDistributionGauge implements Gauge<DistributionResult> { - DistributionData data; + DistributionResult data; - FlinkDistributionGauge(DistributionData data) { + FlinkDistributionGauge(DistributionResult data) { this.data = data; } - void update(DistributionData data) { + void update(DistributionResult data) { this.data = data; } @Override - public DistributionData getValue() { + public DistributionResult getValue() { return data; } } /** - * Flink {@link Gauge} for {@link GaugeData}. + * Flink {@link Gauge} for {@link GaugeResult}. */ - public static class FlinkGauge implements Gauge<GaugeData> { + public static class FlinkGauge implements Gauge<GaugeResult> { - GaugeData data; + GaugeResult data; - FlinkGauge(GaugeData data) { + FlinkGauge(GaugeResult data) { this.data = data; } - void update(GaugeData update) { - this.data = data.combine(update); + void update(GaugeResult update) { + this.data = update; } @Override - public GaugeData getValue() { + public GaugeResult getValue() { return data; } } - - /** - * Flink {@link Accumulator} for {@link GaugeData}. - */ - public static class FlinkDistributionDataAccumulator implements - Accumulator<DistributionData, DistributionData> { - - private static final long serialVersionUID = 1L; - - private DistributionData data; - - public FlinkDistributionDataAccumulator(DistributionData data) { - this.data = data; - } - - @Override - public void add(DistributionData value) { - if (data == null) { - this.data = value; - } else { - this.data = this.data.combine(value); - } - } - - @Override - public DistributionData getLocalValue() { - return data; - } - - @Override - public void resetLocal() { - data = null; - } - - @Override - public void merge(Accumulator<DistributionData, DistributionData> other) { - data = data.combine(other.getLocalValue()); - } - - @Override - public Accumulator<DistributionData, DistributionData> clone() { - try { - super.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - - return new FlinkDistributionDataAccumulator( - DistributionData.create(data.sum(), data.count(), data.min(), data.max())); - } - } - - /** - * Flink {@link Accumulator} for {@link GaugeData}. - */ - public static class FlinkGaugeAccumulator implements Accumulator<GaugeData, GaugeData> { - - private GaugeData data; - - public FlinkGaugeAccumulator(GaugeData data) { - this.data = data; - } - - @Override - public void add(GaugeData value) { - if (data == null) { - this.data = value; - } else { - this.data = this.data.combine(value); - } - } - - @Override - public GaugeData getLocalValue() { - return data; - } - - @Override - public void resetLocal() { - this.data = null; - } - - @Override - public void merge(Accumulator<GaugeData, GaugeData> other) { - data = data.combine(other.getLocalValue()); - } - - @Override - public Accumulator<GaugeData, GaugeData> clone() { - try { - super.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - - return new FlinkGaugeAccumulator( - GaugeData.create(data.value())); - } - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java deleted file mode 100644 index 9e1430b..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.metrics; - - -import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.COUNTER_PREFIX; -import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.DISTRIBUTION_PREFIX; -import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.GAUGE_PREFIX; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.metrics.DistributionData; -import org.apache.beam.sdk.metrics.DistributionResult; -import org.apache.beam.sdk.metrics.GaugeData; -import org.apache.beam.sdk.metrics.GaugeResult; -import org.apache.beam.sdk.metrics.MetricFiltering; -import org.apache.beam.sdk.metrics.MetricKey; -import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricQueryResults; -import org.apache.beam.sdk.metrics.MetricResult; -import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsFilter; - -/** - * Implementation of {@link MetricResults} for the Flink Runner. - */ -public class FlinkMetricResults extends MetricResults { - - private Map<String, Object> accumulators; - - public FlinkMetricResults(Map<String, Object> accumulators) { - this.accumulators = accumulators; - } - - @Override - public MetricQueryResults queryMetrics(MetricsFilter filter) { - return new FlinkMetricQueryResults(filter); - } - - private class FlinkMetricQueryResults implements MetricQueryResults { - - private MetricsFilter filter; - - FlinkMetricQueryResults(MetricsFilter filter) { - this.filter = filter; - } - - @Override - public Iterable<MetricResult<Long>> counters() { - List<MetricResult<Long>> result = new ArrayList<>(); - for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { - if (accumulator.getKey().startsWith(COUNTER_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); - if (MetricFiltering.matches(filter, metricKey)) { - result.add(new FlinkMetricResult<>( - metricKey.metricName(), metricKey.stepName(), (Long) accumulator.getValue())); - } - } - } - return result; - } - - @Override - public Iterable<MetricResult<DistributionResult>> distributions() { - List<MetricResult<DistributionResult>> result = new ArrayList<>(); - for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { - if (accumulator.getKey().startsWith(DISTRIBUTION_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); - DistributionData data = (DistributionData) accumulator.getValue(); - if (MetricFiltering.matches(filter, metricKey)) { - result.add(new FlinkMetricResult<>( - metricKey.metricName(), metricKey.stepName(), data.extractResult())); - } - } - } - return result; - } - - @Override - public Iterable<MetricResult<GaugeResult>> gauges() { - List<MetricResult<GaugeResult>> result = new ArrayList<>(); - for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { - if (accumulator.getKey().startsWith(GAUGE_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); - GaugeData data = (GaugeData) accumulator.getValue(); - if (MetricFiltering.matches(filter, metricKey)) { - result.add(new FlinkMetricResult<>( - metricKey.metricName(), metricKey.stepName(), data.extractResult())); - } - } - } - return result; - } - - } - - private static class FlinkMetricResult<T> implements MetricResult<T> { - private final MetricName name; - private final String step; - private final T result; - - FlinkMetricResult(MetricName name, String step, T result) { - this.name = name; - this.step = step; - this.result = result; - } - - @Override - public MetricName name() { - return name; - } - - @Override - public String step() { - return step; - } - - @Override - public T committed() { - throw new UnsupportedOperationException("Flink runner does not currently support committed" - + " metrics results. Please use 'attempted' instead."); - } - - @Override - public T attempted() { - return result; - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java new file mode 100644 index 0000000..a9dc2ce --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/MetricsAccumulator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import org.apache.beam.sdk.metrics.MetricsContainerStepMap; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.SimpleAccumulator; + +/** + * Accumulator of {@link MetricsContainerStepMap}. + */ +public class MetricsAccumulator implements SimpleAccumulator<MetricsContainerStepMap> { + private MetricsContainerStepMap metricsContainers = new MetricsContainerStepMap(); + + @Override + public void add(MetricsContainerStepMap value) { + metricsContainers.updateAll(value); + } + + @Override + public MetricsContainerStepMap getLocalValue() { + return metricsContainers; + } + + @Override + public void resetLocal() { + this.metricsContainers = new MetricsContainerStepMap(); + } + + @Override + public void merge(Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> other) { + this.add(other.getLocalValue()); + } + + @Override + public Accumulator<MetricsContainerStepMap, MetricsContainerStepMap> clone() { + try { + super.clone(); + } catch (CloneNotSupportedException ignored) { + } + MetricsAccumulator metricsAccumulator = new MetricsAccumulator(); + metricsAccumulator.getLocalValue().updateAll(this.getLocalValue()); + return metricsAccumulator; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java index 38263d9..64738cc 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -32,13 +32,16 @@ import org.apache.beam.sdk.options.PipelineOptions; */ public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT>> { + private final String stepName; private final FlinkMetricContainer container; private final Boolean enableMetrics; public ReaderInvocationUtil( + String stepName, PipelineOptions options, FlinkMetricContainer container) { FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class); + this.stepName = stepName; enableMetrics = flinkPipelineOptions.getEnableMetrics(); this.container = container; } @@ -46,7 +49,7 @@ public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT public boolean invokeStart(ReaderT reader) throws IOException { if (enableMetrics) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { boolean result = reader.start(); container.updateMetrics(); return result; @@ -59,7 +62,7 @@ public class ReaderInvocationUtil<OutputT, ReaderT extends Source.Reader<OutputT public boolean invokeAdvance(ReaderT reader) throws IOException { if (enableMetrics) { try (Closeable ignored = - MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) { boolean result = reader.advance(); container.updateMetrics(); return result; http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index f2b81fc..27e6912 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -71,9 +71,13 @@ public class SourceInputFormat<T> @Override public void open(SourceInputSplit<T> sourceInputSplit) throws IOException { - FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext()); + readerInvoker = - new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + new ReaderInvocationUtil<>( + stepName, + serializedOptions.getPipelineOptions(), + metricContainer); reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options); inputAvailable = readerInvoker.invokeStart(reader); http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index a142685..6d75688 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -104,9 +104,13 @@ public class BoundedSourceWrapper<OutputT> numSubtasks, localSources); - FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext()); + ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker = - new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + new ReaderInvocationUtil<>( + stepName, + serializedOptions.getPipelineOptions(), + metricContainer); readers = new ArrayList<>(); // initialize readers from scratch http://git-wip-us.apache.org/repos/asf/beam/blob/8c2da9ad/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index ee20fd5..b9c431d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -214,10 +214,13 @@ public class UnboundedSourceWrapper< context = ctx; - FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); - ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker = - new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext()); + ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker = + new ReaderInvocationUtil<>( + stepName, + serializedOptions.getPipelineOptions(), + metricContainer); if (localReaders.size() == 0) { // do nothing, but still look busy ...