Repository: beam Updated Branches: refs/heads/master ae72456b7 -> 19105d9be
[BEAM-1764] Remove aggregators from Flink Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8e94f8f8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8e94f8f8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8e94f8f8 Branch: refs/heads/master Commit: 8e94f8f827d3fa5cd94f7fffcaf0a7730df86587 Parents: ae72456 Author: Ismaël MejÃa <ieme...@apache.org> Authored: Tue May 2 15:59:26 2017 +0200 Committer: Ismaël MejÃa <ieme...@apache.org> Committed: Tue May 2 18:11:42 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/flink/FlinkRunner.java | 3 +- .../beam/runners/flink/FlinkRunnerResult.java | 15 ++- .../flink/metrics/FlinkMetricResults.java | 30 +++--- .../functions/FlinkAggregatorFactory.java | 53 ----------- .../functions/FlinkDoFnFunction.java | 2 +- .../functions/FlinkStatefulDoFnFunction.java | 2 +- .../SerializableFnAggregatorWrapper.java | 98 -------------------- .../wrappers/streaming/DoFnOperator.java | 27 +----- 8 files changed, 26 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index 096f030..181ffda 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -126,8 +126,7 @@ public class FlinkRunner extends PipelineRunner<PipelineResult> { LOG.info("Execution finished in {} msecs", result.getNetRuntime()); Map<String, Object> accumulators = result.getAllAccumulatorResults(); if (accumulators != null && !accumulators.isEmpty()) { - LOG.info("Final aggregator values:"); - + LOG.info("Final accumulator values:"); for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { LOG.info("{} : {}", entry.getKey(), entry.getValue()); } http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index dfc1d8e..90dc79b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -27,19 +27,18 @@ import org.joda.time.Duration; /** * Result of executing a {@link org.apache.beam.sdk.Pipeline} with Flink. This - * has methods to query to job runtime and the final values of - * {@link org.apache.beam.sdk.transforms.Aggregator}s. + * has methods to query to job runtime and the final values of the accumulators. */ public class FlinkRunnerResult implements PipelineResult { - private final Map<String, Object> aggregators; + private final Map<String, Object> accumulators; private final long runtime; - FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { - this.aggregators = (aggregators == null || aggregators.isEmpty()) + FlinkRunnerResult(Map<String, Object> accumulators, long runtime) { + this.accumulators = (accumulators == null || accumulators.isEmpty()) ? Collections.<String, Object>emptyMap() - : Collections.unmodifiableMap(aggregators); + : Collections.unmodifiableMap(accumulators); this.runtime = runtime; } @@ -51,7 +50,7 @@ public class FlinkRunnerResult implements PipelineResult { @Override public String toString() { return "FlinkRunnerResult{" - + "aggregators=" + aggregators + + "accumulators=" + accumulators + ", runtime=" + runtime + '}'; } @@ -73,6 +72,6 @@ public class FlinkRunnerResult implements PipelineResult { @Override public MetricResults metrics() { - return new FlinkMetricResults(aggregators); + return new FlinkMetricResults(accumulators); } } http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java index 263a68e..9e1430b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java @@ -42,10 +42,10 @@ import org.apache.beam.sdk.metrics.MetricsFilter; */ public class FlinkMetricResults extends MetricResults { - private Map<String, Object> aggregators; + private Map<String, Object> accumulators; - public FlinkMetricResults(Map<String, Object> aggregators) { - this.aggregators = aggregators; + public FlinkMetricResults(Map<String, Object> accumulators) { + this.accumulators = accumulators; } @Override @@ -64,12 +64,12 @@ public class FlinkMetricResults extends MetricResults { @Override public Iterable<MetricResult<Long>> counters() { List<MetricResult<Long>> result = new ArrayList<>(); - for (Map.Entry<String, Object> entry : aggregators.entrySet()) { - if (entry.getKey().startsWith(COUNTER_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); + for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { + if (accumulator.getKey().startsWith(COUNTER_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); if (MetricFiltering.matches(filter, metricKey)) { result.add(new FlinkMetricResult<>( - metricKey.metricName(), metricKey.stepName(), (Long) entry.getValue())); + metricKey.metricName(), metricKey.stepName(), (Long) accumulator.getValue())); } } } @@ -79,10 +79,10 @@ public class FlinkMetricResults extends MetricResults { @Override public Iterable<MetricResult<DistributionResult>> distributions() { List<MetricResult<DistributionResult>> result = new ArrayList<>(); - for (Map.Entry<String, Object> entry : aggregators.entrySet()) { - if (entry.getKey().startsWith(DISTRIBUTION_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); - DistributionData data = (DistributionData) entry.getValue(); + for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { + if (accumulator.getKey().startsWith(DISTRIBUTION_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); + DistributionData data = (DistributionData) accumulator.getValue(); if (MetricFiltering.matches(filter, metricKey)) { result.add(new FlinkMetricResult<>( metricKey.metricName(), metricKey.stepName(), data.extractResult())); @@ -95,10 +95,10 @@ public class FlinkMetricResults extends MetricResults { @Override public Iterable<MetricResult<GaugeResult>> gauges() { List<MetricResult<GaugeResult>> result = new ArrayList<>(); - for (Map.Entry<String, Object> entry : aggregators.entrySet()) { - if (entry.getKey().startsWith(GAUGE_PREFIX)) { - MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); - GaugeData data = (GaugeData) entry.getValue(); + for (Map.Entry<String, Object> accumulator : accumulators.entrySet()) { + if (accumulator.getKey().startsWith(GAUGE_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(accumulator.getKey()); + GaugeData data = (GaugeData) accumulator.getValue(); if (MetricFiltering.matches(filter, metricKey)) { result.add(new FlinkMetricResult<>( metricKey.metricName(), metricKey.stepName(), data.extractResult())); http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java deleted file mode 100644 index fb2493b..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.flink.api.common.functions.RuntimeContext; - -/** - * A {@link AggregatorFactory} for the Flink Batch Runner. - */ -public class FlinkAggregatorFactory implements AggregatorFactory{ - - private final RuntimeContext runtimeContext; - - public FlinkAggregatorFactory(RuntimeContext runtimeContext) { - this.runtimeContext = runtimeContext; - } - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName, - Combine.CombineFn<InputT, AccumT, OutputT> combine) { - @SuppressWarnings("unchecked") - SerializableFnAggregatorWrapper<InputT, OutputT> result = - (SerializableFnAggregatorWrapper<InputT, OutputT>) - runtimeContext.getAccumulator(aggregatorName); - - if (result == null) { - result = new SerializableFnAggregatorWrapper<>(combine); - runtimeContext.addAccumulator(aggregatorName, result); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 68ac780..d28e7c4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -105,7 +105,7 @@ public class FlinkDoFnFunction<InputT, OutputT> // see SimpleDoFnRunner, just use it to limit number of additional outputs Collections.<TupleTag<?>>emptyList(), new FlinkNoOpStepContext(), - new FlinkAggregatorFactory(runtimeContext), + null, windowingStrategy); if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 3e02bee..879fad7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -131,7 +131,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> return timerInternals; } }, - new FlinkAggregatorFactory(runtimeContext), + null, windowingStrategy); if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java deleted file mode 100644 index 70d97e3..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.Serializable; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.flink.api.common.accumulators.Accumulator; - -/** - * Wrapper that wraps a {@link org.apache.beam.sdk.transforms.Combine.CombineFn} - * in a Flink {@link org.apache.flink.api.common.accumulators.Accumulator} for using - * the function as an aggregator in a {@link org.apache.beam.sdk.transforms.ParDo} - * operation. - */ -public class SerializableFnAggregatorWrapper<InputT, OutputT> - implements Aggregator<InputT, OutputT>, Accumulator<InputT, Serializable> { - - private OutputT aa; - private Combine.CombineFn<InputT, ?, OutputT> combiner; - - public SerializableFnAggregatorWrapper(Combine.CombineFn<InputT, ?, OutputT> combiner) { - this.combiner = combiner; - resetLocal(); - } - - @Override - @SuppressWarnings("unchecked") - public void add(InputT value) { - this.aa = combiner.apply(ImmutableList.of((InputT) aa, value)); - } - - @Override - public Serializable getLocalValue() { - return (Serializable) aa; - } - - @Override - public void resetLocal() { - this.aa = combiner.apply(ImmutableList.<InputT>of()); - } - - @Override - @SuppressWarnings("unchecked") - public void merge(Accumulator<InputT, Serializable> other) { - this.aa = combiner.apply(ImmutableList.of((InputT) aa, (InputT) other.getLocalValue())); - } - - @Override - public void addValue(InputT value) { - add(value); - } - - @Override - public String getName() { - return "Aggregator :" + combiner.toString(); - } - - @Override - public Combine.CombineFn<InputT, ?, OutputT> getCombineFn() { - return combiner; - } - - @Override - public Accumulator<InputT, Serializable> clone() { - try { - super.clone(); - } catch (CloneNotSupportedException e) { - // Flink Accumulators cannot throw CloneNotSupportedException, work around that. - throw new RuntimeException(e); - } - - // copy it by merging - OutputT resultCopy = combiner.apply(Lists.newArrayList((InputT) aa)); - SerializableFnAggregatorWrapper<InputT, OutputT> result = new - SerializableFnAggregatorWrapper<>(combiner); - - result.aa = resultCopy; - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/8e94f8f8/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 54eb770..01830de 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -30,7 +30,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; @@ -51,7 +50,6 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals; import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals; @@ -59,8 +57,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkS import org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -205,27 +201,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> currentInputWatermark = Long.MIN_VALUE; currentOutputWatermark = Long.MIN_VALUE; - AggregatorFactory aggregatorFactory = new AggregatorFactory() { - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, - ExecutionContext.StepContext stepContext, - String aggregatorName, - Combine.CombineFn<InputT, AccumT, OutputT> combine) { - - @SuppressWarnings("unchecked") - SerializableFnAggregatorWrapper<InputT, OutputT> result = - (SerializableFnAggregatorWrapper<InputT, OutputT>) - getRuntimeContext().getAccumulator(aggregatorName); - - if (result == null) { - result = new SerializableFnAggregatorWrapper<>(combine); - getRuntimeContext().addAccumulator(aggregatorName, result); - } - return result; - } - }; - sideInputReader = NullSideInputReader.of(sideInputs); if (!sideInputs.isEmpty()) { @@ -285,7 +260,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> mainOutputTag, additionalOutputTags, stepContext, - aggregatorFactory, + null, windowingStrategy); if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) {