Repository: incubator-beam Updated Branches: refs/heads/master d624d3b6b -> 5ebbd500c
[BEAM-362] Port runners to runners-core AggregatoryFactory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/55f04955 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/55f04955 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/55f04955 Branch: refs/heads/master Commit: 55f0495583312c9c0dea620d6a4e85193e97f255 Parents: d624d3b Author: Kenneth Knowles <k...@google.com> Authored: Thu Dec 15 21:06:14 2016 -0800 Committer: Sela <ans...@paypal.com> Committed: Fri Dec 16 11:46:18 2016 +0200 ---------------------------------------------------------------------- .../runners/apex/translation/operators/ApexParDoOperator.java | 2 +- .../src/main/java/org/apache/beam/runners/core/DoFnRunners.java | 1 - .../java/org/apache/beam/runners/core/SimpleDoFnRunner.java | 1 - .../java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java | 1 - .../org/apache/beam/runners/direct/AggregatorContainer.java | 2 +- .../flink/translation/wrappers/streaming/DoFnOperator.java | 3 ++- .../apache/beam/runners/spark/aggregators/SparkAggregators.java | 5 +++-- 7 files changed, 7 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 1e76949..4538fb5 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -46,7 +47,6 @@ import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index da16573..0e4bf75 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 041cdde..d504b40 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -30,7 +30,6 @@ import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 10af29a..7d93200 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -28,7 +28,6 @@ import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java index e86bc3e..c7fa4df 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java @@ -27,8 +27,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.util.ExecutionContext; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 87b15a7..001e3b6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -28,6 +28,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -192,7 +193,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> currentInputWatermark = Long.MIN_VALUE; currentOutputWatermark = currentInputWatermark; - Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() { + AggregatorFactory aggregatorFactory = new AggregatorFactory() { @Override public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( Class<?> fnClass, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/55f04955/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java index 657264f..17d5844 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/SparkAggregators.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.spark.aggregators; import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.Map; +import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.spark.translation.SparkRuntimeContext; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.transforms.Aggregator; @@ -99,9 +100,9 @@ public class SparkAggregators { } /** - * An implementation of {@link Aggregator.AggregatorFactory} for the SparkRunner. + * An implementation of {@link AggregatorFactory} for the SparkRunner. */ - public static class Factory implements Aggregator.AggregatorFactory { + public static class Factory implements AggregatorFactory { private final SparkRuntimeContext runtimeContext; private final Accumulator<NamedAggregators> accumulator;