Moves OldDoFn to runners-core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f8b8c5b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f8b8c5b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f8b8c5b Branch: refs/heads/master Commit: 5f8b8c5b06cfd49c4293a20dff2eea99f1076444 Parents: 77c7505 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Tue Jan 17 16:12:39 2017 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri Jan 20 13:31:58 2017 -0800 ---------------------------------------------------------------------- .../apex/translation/WindowBoundTranslator.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../beam/runners/core/AssignWindowsDoFn.java | 3 +- .../apache/beam/runners/core/DoFnAdapters.java | 1 - .../apache/beam/runners/core/DoFnRunner.java | 1 - .../apache/beam/runners/core/DoFnRunners.java | 1 - .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 - .../runners/core/GroupAlsoByWindowsDoFn.java | 1 - .../core/LateDataDroppingDoFnRunner.java | 1 - .../org/apache/beam/runners/core/OldDoFn.java | 472 +++++++++++++++++++ .../beam/runners/core/SimpleOldDoFnRunner.java | 3 +- .../core/DoFnDelegatingAggregatorTest.java | 144 ++++++ .../apache/beam/runners/core/NoOpOldDoFn.java | 72 +++ .../beam/runners/core/OldDoFnContextTest.java | 72 +++ .../apache/beam/runners/core/OldDoFnTest.java | 192 ++++++++ .../runners/core/SimpleOldDoFnRunnerTest.java | 2 +- .../runners/flink/OldPerKeyCombineFnRunner.java | 2 +- .../flink/OldPerKeyCombineFnRunners.java | 2 +- .../functions/FlinkDoFnFunction.java | 2 +- .../FlinkMergingNonShuffleReduceFunction.java | 2 +- .../functions/FlinkMultiOutputDoFnFunction.java | 2 +- .../FlinkMultiOutputProcessContext.java | 2 +- .../functions/FlinkNoElementAssignContext.java | 2 +- .../functions/FlinkPartialReduceFunction.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../functions/FlinkReduceFunction.java | 2 +- .../FlinkSingleOutputProcessContext.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 4 +- .../sdk/transforms/DelegatingAggregator.java | 2 +- .../org/apache/beam/sdk/transforms/OldDoFn.java | 465 ------------------ .../org/apache/beam/sdk/util/NameUtils.java | 2 +- .../DoFnDelegatingAggregatorTest.java | 142 ------ .../apache/beam/sdk/transforms/NoOpOldDoFn.java | 71 --- .../beam/sdk/transforms/OldDoFnContextTest.java | 69 --- .../apache/beam/sdk/transforms/OldDoFnTest.java | 187 -------- .../org/apache/beam/sdk/util/NameUtilsTest.java | 20 +- 39 files changed, 982 insertions(+), 978 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java index ef049e1..50af81d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java @@ -23,8 +23,8 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.AssignWindowsDoFn; import org.apache.beam.runners.core.DoFnAdapters; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 173434f..4c2b461 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -44,6 +44,7 @@ import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOption import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -52,7 +53,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.CoderUtils; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index de4c15d..808001e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -43,6 +43,7 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.sdk.coders.Coder; @@ -50,7 +51,6 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.UserCodeException; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java index 0eb1667..bbf3574 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java @@ -21,8 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import java.util.Collection; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; +import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index 0f5624f..23aba58 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index 7c73a34..66f95db 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index 2f3e93c..f3972ae 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -22,7 +22,6 @@ import org.apache.beam.runners.core.ExecutionContext.StepContext; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index d79683a..ecce4fc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -21,7 +21,6 @@ import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java index 9a2f8fd..7e96136 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 290171a..9436ccf 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -22,7 +22,6 @@ import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java new file mode 100644 index 0000000..b099721 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DelegatingAggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * The argument to {@link ParDo} providing the code to use to process + * elements of the input + * {@link org.apache.beam.sdk.values.PCollection}. + * + * <p>See {@link ParDo} for more explanation, examples of use, and + * discussion of constraints on {@code OldDoFn}s, including their + * serializability, lack of access to global shared mutable state, + * requirements for failure tolerance, and benefits of optimization. + * + * <p>{@code OldDoFn}s can be tested in the context of a particular + * {@code Pipeline} by running that {@code Pipeline} on sample input + * and then checking its output. Unit testing of a {@code OldDoFn}, + * separately from any {@code ParDo} transform or {@code Pipeline}, + * can be done via the {@link DoFnTester} harness. + * + * <p>{@link DoFn} (currently experimental) offers an alternative + * mechanism for accessing {@link ProcessContext#window()} without the need + * to implement {@link RequiresWindowAccess}. + * + * <p>See also {@link #processElement} for details on implementing the transformation + * from {@code InputT} to {@code OutputT}. + * + * @param <InputT> the type of the (main) input elements + * @param <OutputT> the type of the (main) output elements + * @deprecated Uses of {@link OldDoFn} should be replaced by the new {@link DoFn}. + */ +@Deprecated +public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData { + /** + * Information accessible to all methods in this {@code OldDoFn}. + * Used primarily to output elements. + */ + public abstract class Context { + + /** + * Returns the {@code PipelineOptions} specified with the + * {@link org.apache.beam.sdk.runners.PipelineRunner} + * invoking this {@code OldDoFn}. The {@code PipelineOptions} will + * be the default running via {@link DoFnTester}. + */ + public abstract PipelineOptions getPipelineOptions(); + + /** + * Adds the given element to the main output {@code PCollection}. + * + * <p>Once passed to {@code output} the element should be considered + * immutable and not be modified in any way. It may be cached or retained + * by a Beam runner or later steps in the pipeline, or used in + * other unspecified ways. + * + * <p>If invoked from {@link OldDoFn#processElement processElement}, the output + * element will have the same timestamp and be in the same windows + * as the input element passed to {@link OldDoFn#processElement processElement}. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element. The output element + * will have a timestamp of negative infinity. + */ + public abstract void output(OutputT output); + + /** + * Adds the given element to the main output {@code PCollection}, + * with the given timestamp. + * + * <p>Once passed to {@code outputWithTimestamp} the element should not be + * modified in any way. + * + * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp + * must not be older than the input element's timestamp minus + * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will + * be in the same windows as the input element. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element except for the + * timestamp. + */ + public abstract void outputWithTimestamp(OutputT output, Instant timestamp); + + /** + * Adds the given element to the side output {@code PCollection} with the + * given tag. + * + * <p>Once passed to {@code sideOutput} the element should not be modified + * in any way. + * + * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to + * specify the tags of side outputs that it consumes. Non-consumed side + * outputs, e.g., outputs for monitoring purposes only, don't necessarily + * need to be specified. + * + * <p>The output element will have the same timestamp and be in the same + * windows as the input element passed to {@link OldDoFn#processElement processElement}. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element. The output element + * will have a timestamp of negative infinity. + * + * @see ParDo#withOutputTags + */ + public abstract <T> void sideOutput(TupleTag<T> tag, T output); + + /** + * Adds the given element to the specified side output {@code PCollection}, + * with the given timestamp. + * + * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be + * modified in any way. + * + * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp + * must not be older than the input element's timestamp minus + * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}. The output element will + * be in the same windows as the input element. + * + * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle}, + * this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} + * of the input {@code PCollection} to determine what windows the element + * should be in, throwing an exception if the {@code WindowFn} attempts + * to access any information about the input element except for the + * timestamp. + * + * @see ParDo#withOutputTags + */ + public abstract <T> void sideOutputWithTimestamp( + TupleTag<T> tag, T output, Instant timestamp); + + /** + * Creates an {@link Aggregator} in the {@link OldDoFn} context with the + * specified name and aggregation logic specified by {@link CombineFn}. + * + * <p>For internal use only. + * + * @param name the name of the aggregator + * @param combiner the {@link CombineFn} to use in the aggregator + * @return an aggregator for the provided name and {@link CombineFn} in this + * context + */ + @Experimental(Kind.AGGREGATOR) + public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner); + + /** + * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are + * usable within this context. + * + * <p>This method should be called by runners before {@link OldDoFn#startBundle} + * is executed. + */ + @Experimental(Kind.AGGREGATOR) + protected final void setupDelegateAggregators() { + for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) { + setupDelegateAggregator(aggregator); + } + + aggregatorsAreFinal = true; + } + + private <AggInputT, AggOutputT> void setupDelegateAggregator( + DelegatingAggregator<AggInputT, AggOutputT> aggregator) { + + Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal( + aggregator.getName(), aggregator.getCombineFn()); + + aggregator.setDelegate(delegate); + } + } + + /** + * Information accessible when running {@link OldDoFn#processElement}. + */ + public abstract class ProcessContext extends Context { + + /** + * Returns the input element to be processed. + * + * <p>The element should be considered immutable. A Beam runner will not mutate the + * element, so it is safe to cache, etc. The element should not be mutated by any of the + * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the runner + * runtime, or used in other unspecified ways. + */ + public abstract InputT element(); + + /** + * Returns the value of the side input for the window corresponding to the + * window of the main input element. + * + * <p>See + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow} + * for how this corresponding window is determined. + * + * @throws IllegalArgumentException if this is not a side input + * @see ParDo#withSideInputs + */ + public abstract <T> T sideInput(PCollectionView<T> view); + + /** + * Returns the timestamp of the input element. + * + * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + */ + public abstract Instant timestamp(); + + /** + * Returns the window into which the input element has been assigned. + * + * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + * + * @throws UnsupportedOperationException if this {@link OldDoFn} does + * not implement {@link RequiresWindowAccess}. + */ + public abstract BoundedWindow window(); + + /** + * Returns information about the pane within this window into which the + * input element has been assigned. + * + * <p>Generally all data is in a single, uninteresting pane unless custom + * triggering and/or late data has been explicitly requested. + * See {@link org.apache.beam.sdk.transforms.windowing.Window} + * for more information. + */ + public abstract PaneInfo pane(); + + /** + * Returns the process context to use for implementing windowing. + */ + @Experimental + public abstract WindowingInternals<InputT, OutputT> windowingInternals(); + } + + /** + * Returns the allowed timestamp skew duration, which is the maximum + * duration that timestamps can be shifted backward in + * {@link OldDoFn.Context#outputWithTimestamp}. + * + * <p>The default value is {@code Duration.ZERO}, in which case + * timestamps can only be shifted forward to future. For infinite + * skew, return {@code Duration.millis(Long.MAX_VALUE)}. + * + * <p>Note that producing an element whose timestamp is less than the + * current timestamp may result in late data, i.e. returning a non-zero + * value here does not impact watermark calculations used for firing + * windows. + * + * @deprecated does not interact well with the watermark. + */ + @Deprecated + public Duration getAllowedTimestampSkew() { + return Duration.ZERO; + } + + /** + * Interface for signaling that a {@link OldDoFn} needs to access the window the + * element is being processed in, via {@link OldDoFn.ProcessContext#window}. + */ + @Experimental + public interface RequiresWindowAccess {} + + public OldDoFn() { + this(new HashMap<String, DelegatingAggregator<?, ?>>()); + } + + public OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) { + this.aggregators = aggregators; + } + + ///////////////////////////////////////////////////////////////////////////// + + private final Map<String, DelegatingAggregator<?, ?>> aggregators; + + /** + * Protects aggregators from being created after initialization. + */ + private boolean aggregatorsAreFinal; + + /** + * Prepares this {@link DoFn} instance for processing bundles. + * + * <p>{@link #setup()} will be called at most once per {@link DoFn} instance, and before any other + * {@link DoFn} method is called. + * + * <p>By default, does nothing. + */ + public void setup() throws Exception { + } + + /** + * Prepares this {@code OldDoFn} instance for processing a batch of elements. + * + * <p>By default, does nothing. + */ + public void startBundle(Context c) throws Exception { + } + + /** + * Processes one input element. + * + * <p>The current element of the input {@code PCollection} is returned by + * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Beam + * runner will not mutate the element, so it is safe to cache, etc. The element should not be + * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by + * the Beam runner, or used in other unspecified ways. + * + * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}. + * Once passed to {@code output} the element should be considered immutable and not be modified in + * any way. It may be cached elsewhere, retained by the Beam runner, or used in other + * unspecified ways. + * + * @see ProcessContext + */ + public abstract void processElement(ProcessContext c) throws Exception; + + /** + * Finishes processing this batch of elements. + * + * <p>By default, does nothing. + */ + public void finishBundle(Context c) throws Exception { + } + + /** + * Cleans up this {@link DoFn}. + * + * <p>{@link #teardown()} will be called before the {@link PipelineRunner} discards a {@link DoFn} + * instance, including due to another {@link DoFn} method throwing an {@link Exception}. No other + * {@link DoFn} methods will be called after a call to {@link #teardown()}. + * + * <p>By default, does nothing. + */ + public void teardown() throws Exception { + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * {@inheritDoc} + * + * <p>By default, does not register any display data. Implementors may override this method + * to provide their own display data. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } + + ///////////////////////////////////////////////////////////////////////////// + + /** + * Returns an {@link Aggregator} with aggregation logic specified by the + * {@link CombineFn} argument. The name provided must be unique across + * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created + * during pipeline construction. + * + * @param name the name of the aggregator + * @param combiner the {@link CombineFn} to use in the aggregator + * @return an aggregator for the provided name and combiner in the scope of + * this OldDoFn + * @throws NullPointerException if the name or combiner is null + * @throws IllegalArgumentException if the given name collides with another + * aggregator in this scope + * @throws IllegalStateException if called during pipeline processing. + */ + protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) { + checkNotNull(name, "name cannot be null"); + checkNotNull(combiner, "combiner cannot be null"); + checkArgument(!aggregators.containsKey(name), + "Cannot create aggregator with name %s." + + " An Aggregator with that name already exists within this scope.", + name); + + checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing." + + " Aggregators should be registered during pipeline construction."); + + DelegatingAggregator<AggInputT, AggOutputT> aggregator = + new DelegatingAggregator<>(name, combiner); + aggregators.put(name, aggregator); + return aggregator; + } + + /** + * Returns an {@link Aggregator} with the aggregation logic specified by the + * {@link SerializableFunction} argument. The name provided must be unique + * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be + * created during pipeline construction. + * + * @param name the name of the aggregator + * @param combiner the {@link SerializableFunction} to use in the aggregator + * @return an aggregator for the provided name and combiner in the scope of + * this OldDoFn + * @throws NullPointerException if the name or combiner is null + * @throws IllegalArgumentException if the given name collides with another + * aggregator in this scope + * @throws IllegalStateException if called during pipeline processing. + */ + protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name, + SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) { + checkNotNull(combiner, "combiner cannot be null."); + return createAggregator(name, Combine.IterableCombineFn.of(combiner)); + } + + /** + * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}. + */ + Collection<Aggregator<?, ?>> getAggregators() { + return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 9808e56..2fe9226 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -27,11 +27,10 @@ import java.util.List; import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java new file mode 100644 index 0000000..b44e8a4 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DelegatingAggregator; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link DelegatingAggregator}. + */ +@RunWith(JUnit4.class) +public class DoFnDelegatingAggregatorTest { + + @Mock + private Aggregator<Long, Long> delegate; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testAddValueWithoutDelegateThrowsException() { + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + String name = "agg"; + CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); + + DelegatingAggregator<Double, Double> aggregator = + (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("cannot be called"); + thrown.expectMessage("DoFn"); + + aggregator.addValue(21.2); + } + + @Test + public void testSetDelegateThenAddValueCallsDelegate() { + String name = "agg"; + CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + DelegatingAggregator<Long, Long> aggregator = + (DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner); + + aggregator.setDelegate(delegate); + + aggregator.addValue(12L); + + verify(delegate).addValue(12L); + } + + @Test + public void testSetDelegateWithExistingDelegateStartsDelegatingToSecond() { + String name = "agg"; + CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + DelegatingAggregator<Double, Double> aggregator = + (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); + + @SuppressWarnings("unchecked") + Aggregator<Double, Double> secondDelegate = + mock(Aggregator.class, "secondDelegate"); + + aggregator.setDelegate(aggregator); + aggregator.setDelegate(secondDelegate); + + aggregator.addValue(2.25); + + verify(secondDelegate).addValue(2.25); + verify(delegate, never()).addValue(anyLong()); + } + + @Test + public void testGetNameReturnsName() { + String name = "agg"; + CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + DelegatingAggregator<Double, Double> aggregator = + (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); + + assertEquals(name, aggregator.getName()); + } + + @Test + public void testGetCombineFnReturnsCombineFn() { + String name = "agg"; + CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + DelegatingAggregator<Double, Double> aggregator = + (DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner); + + assertEquals(combiner, aggregator.getCombineFn()); + } + + @SuppressWarnings("unchecked") + private static <T> CombineFn<T, ?, T> mockCombineFn( + @SuppressWarnings("unused") Class<T> clazz) { + return mock(CombineFn.class); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java new file mode 100644 index 0000000..5cbea8c --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * A {@link OldDoFn} that does nothing with provided elements. Used for testing + * methods provided by the {@link OldDoFn} abstract class. + * + * @param <InputT> unused. + * @param <OutputT> unused. + */ +class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> { + @Override + public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception { + } + + /** + * Returns a new NoOp Context. + */ + public OldDoFn<InputT, OutputT>.Context context() { + return new NoOpDoFnContext(); + } + + /** + * A {@link OldDoFn.Context} that does nothing and returns exclusively null. + */ + private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context { + @Override + public PipelineOptions getPipelineOptions() { + return null; + } + @Override + public void output(OutputT output) { + } + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + } + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + } + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, + Instant timestamp) { + } + @Override + public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java new file mode 100644 index 0000000..a1cd49d --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Sum; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link OldDoFn.Context}. + */ +@RunWith(JUnit4.class) +public class OldDoFnContextTest { + + @Mock + private Aggregator<Long, Long> agg; + + private OldDoFn<Object, Object> fn; + private OldDoFn<Object, Object>.Context context; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + // Need to be real objects to call the constructor, and to reference the + // outer instance of OldDoFn + NoOpOldDoFn<Object, Object> noOpFn = new NoOpOldDoFn<>(); + OldDoFn<Object, Object>.Context noOpContext = noOpFn.context(); + + fn = spy(noOpFn); + context = spy(noOpContext); + } + + @Test + public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() { + Combine.BinaryCombineLongFn combiner = Sum.ofLongs(); + Aggregator<Long, Long> delegateAggregator = + fn.createAggregator("test", combiner); + + when(context.createAggregatorInternal("test", combiner)).thenReturn(agg); + + context.setupDelegateAggregators(); + delegateAggregator.addValue(1L); + + verify(agg).addValue(1L); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java new file mode 100644 index 0000000..651bc72 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +import java.io.Serializable; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for OldDoFn. + */ +@RunWith(JUnit4.class) +public class OldDoFnTest implements Serializable { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testCreateAggregatorWithCombinerSucceeds() { + String name = "testAggregator"; + Combine.BinaryCombineLongFn combiner = Sum.ofLongs(); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner); + + assertEquals(name, aggregator.getName()); + assertEquals(combiner, aggregator.getCombineFn()); + } + + @Test + public void testCreateAggregatorWithNullNameThrowsException() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("name cannot be null"); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + doFn.createAggregator(null, Sum.ofLongs()); + } + + @Test + public void testCreateAggregatorWithNullCombineFnThrowsException() { + CombineFn<Object, Object, Object> combiner = null; + + thrown.expect(NullPointerException.class); + thrown.expectMessage("combiner cannot be null"); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + doFn.createAggregator("testAggregator", combiner); + } + + @Test + public void testCreateAggregatorWithNullSerializableFnThrowsException() { + SerializableFunction<Iterable<Object>, Object> combiner = null; + + thrown.expect(NullPointerException.class); + thrown.expectMessage("combiner cannot be null"); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + doFn.createAggregator("testAggregator", combiner); + } + + @Test + public void testCreateAggregatorWithSameNameThrowsException() { + String name = "testAggregator"; + CombineFn<Double, ?, Double> combiner = Max.ofDoubles(); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + doFn.createAggregator(name, combiner); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Cannot create"); + thrown.expectMessage(name); + thrown.expectMessage("already exists"); + + doFn.createAggregator(name, combiner); + } + + @Test + public void testCreateAggregatorsWithDifferentNamesSucceeds() { + String nameOne = "testAggregator"; + String nameTwo = "aggregatorPrime"; + CombineFn<Double, ?, Double> combiner = Max.ofDoubles(); + + OldDoFn<Void, Void> doFn = new NoOpOldDoFn<>(); + + Aggregator<Double, Double> aggregatorOne = + doFn.createAggregator(nameOne, combiner); + Aggregator<Double, Double> aggregatorTwo = + doFn.createAggregator(nameTwo, combiner); + + assertNotEquals(aggregatorOne, aggregatorTwo); + } + + @Test + public void testCreateAggregatorThrowsWhenAggregatorsAreFinal() throws Exception { + OldDoFn<String, String> fn = new OldDoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception { } + }; + OldDoFn<String, String>.Context context = createContext(fn); + context.setupDelegateAggregators(); + + thrown.expect(isA(IllegalStateException.class)); + fn.createAggregator("anyAggregate", Max.ofIntegers()); + } + + private OldDoFn<String, String>.Context createContext(OldDoFn<String, String> fn) { + return fn.new Context() { + @Override + public PipelineOptions getPipelineOptions() { + throw new UnsupportedOperationException(); + } + + @Override + public void output(String output) { + throw new UnsupportedOperationException(); + } + + @Override + public void outputWithTimestamp(String output, Instant timestamp) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + throw new UnsupportedOperationException(); + } + + @Override + public <AggInputT, AggOutputT> + Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + throw new UnsupportedOperationException(); + } + }; + } + + @Test + public void testPopulateDisplayDataDefaultBehavior() { + OldDoFn<String, String> usesDefault = + new OldDoFn<String, String>() { + @Override + public void processElement(ProcessContext c) throws Exception {} + }; + + DisplayData data = DisplayData.from(usesDefault); + assertThat(data.items(), empty()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java index 4610069..97da9ee 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java @@ -23,7 +23,7 @@ import static org.mockito.Mockito.mock; import java.util.Arrays; import java.util.List; import org.apache.beam.runners.core.BaseExecutionContext.StepContext; -import org.apache.beam.sdk.transforms.OldDoFn; + import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java index 5d676dc..71c3aa4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.flink; import java.io.Serializable; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; /** * An interface that runs a {@link PerKeyCombineFn} with unified APIs using http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java index 8ebeadf..90894f2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java @@ -17,13 +17,13 @@ */ package org.apache.beam.runners.flink; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PerKeyCombineFnRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.values.PCollectionView; /** http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 2a4a68e..8b2bcc6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; import org.apache.beam.runners.core.DoFnAdapters; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 1b43172..5ec6a77 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -24,12 +24,12 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index a97bd46..aeeabbf 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -19,10 +19,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; import org.apache.beam.runners.core.DoFnAdapters; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java index a3d2b18..7882b5f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java @@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Collection; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java index c890272..ad7255b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.flink.translation.functions; -import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index f5a9087..7db30d1 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -24,12 +24,12 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 53b9803..e955679 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -24,11 +24,11 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index a3fa0d4..81e37f4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -26,12 +26,12 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java index 529b1cc..0db7f5a 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java @@ -19,8 +19,8 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Collection; import java.util.Map; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 90cdf4c..ac85b3c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -43,7 +44,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index cd6b5aa..d4273b2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -42,12 +42,12 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 0c5be90..4d80a39 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -21,7 +21,6 @@ import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -41,8 +40,7 @@ public class DoFnInfo<InputT, OutputT> implements Serializable { private final Map<Long, TupleTag<?>> outputMap; /** - * Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a - * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob. + * Creates a {@link DoFnInfo} for the given {@link DoFn}. */ public static <InputT, OutputT> DoFnInfo<InputT, OutputT> forFn( DoFn<InputT, OutputT> doFn, http://git-wip-us.apache.org/repos/asf/beam/blob/5f8b8c5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java index e03d3b1..cfaf0a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; * @param <AggInputT> the type of input element * @param <AggOutputT> the type of output element */ -class DelegatingAggregator<AggInputT, AggOutputT> +public class DelegatingAggregator<AggInputT, AggOutputT> implements Aggregator<AggInputT, AggOutputT>, Serializable { private static final AtomicInteger ID_GEN = new AtomicInteger(); private final int id;