Moves DoFnAdapters to runners-core
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/149d52b5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/149d52b5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/149d52b5 Branch: refs/heads/master Commit: 149d52b56787bf3620db6b3adbad373366074a5d Parents: 50979f7 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Dec 9 17:28:16 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Thu Jan 12 12:57:14 2017 -0800 ---------------------------------------------------------------------- .../apex/translation/WindowBoundTranslator.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../apache/beam/runners/core/DoFnAdapters.java | 344 +++++++++++++++++++ .../beam/runners/core/SimpleOldDoFnRunner.java | 4 +- .../core/GroupAlsoByWindowsProperties.java | 2 +- .../functions/FlinkDoFnFunction.java | 2 +- .../functions/FlinkMultiOutputDoFnFunction.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../sdk/transforms/AggregatorRetriever.java | 13 +- .../beam/sdk/transforms/DoFnAdapters.java | 340 ------------------ .../org/apache/beam/sdk/transforms/OldDoFn.java | 2 +- .../apache/beam/sdk/transforms/NoOpOldDoFn.java | 2 +- 14 files changed, 367 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java index 33b9269..ef049e1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java @@ -22,8 +22,8 @@ import java.util.Collections; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.AssignWindowsDoFn; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 49ec1c8..173434f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -413,7 +413,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { } @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index c41cd45..1a3387c 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -38,6 +38,7 @@ import org.apache.beam.runners.apex.translation.utils.NoOpStepContext; import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions; import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable; import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; @@ -49,7 +50,6 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java new file mode 100644 index 0000000..0f5624f --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AggregatorRetriever; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.Context; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. + * + * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link + * DoFnInvoker}) rather than via {@link OldDoFn}. + */ +@Deprecated +public class DoFnAdapters { + /** Should not be instantiated. */ + private DoFnAdapters() {} + + /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) { + DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); + if (signature.processElement().observesWindow()) { + return new WindowDoFnAdapter<>(fn); + } else { + return new SimpleDoFnAdapter<>(fn); + } + } + + /** + * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link + * OldDoFn}. + */ + private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> { + private final DoFn<InputT, OutputT> fn; + private transient DoFnInvoker<InputT, OutputT> invoker; + + SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) { + super(AggregatorRetriever.getDelegatingAggregators(fn)); + this.fn = fn; + this.invoker = DoFnInvokers.invokerFor(fn); + } + + @Override + public void setup() throws Exception { + this.invoker.invokeSetup(); + } + + @Override + public void startBundle(Context c) throws Exception { + fn.prepareForProcessing(); + invoker.invokeStartBundle(new ContextAdapter<>(fn, c)); + } + + @Override + public void finishBundle(Context c) throws Exception { + invoker.invokeFinishBundle(new ContextAdapter<>(fn, c)); + } + + @Override + public void teardown() throws Exception { + this.invoker.invokeTeardown(); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c); + invoker.invokeProcessElement(adapter); + } + + @Override + public Duration getAllowedTimestampSkew() { + return fn.getAllowedTimestampSkew(); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.delegate(fn); + } + + private void readObject(java.io.ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.invoker = DoFnInvokers.invokerFor(fn); + } + } + + /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */ + private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT> + implements OldDoFn.RequiresWindowAccess { + + WindowDoFnAdapter(DoFn<InputT, OutputT> fn) { + super(fn); + } + } + + /** + * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link + * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is + * unavailable. + */ + private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private OldDoFn<InputT, OutputT>.Context context; + + private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) { + fn.super(); + this.context = context; + super.setupDelegateAggregators(); + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public void output(OutputT output) { + context.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + context.sideOutput(tag, output); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + context.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, + CombineFn<AggInputT, ?, AggOutputT> combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override + public BoundedWindow window() { + // The OldDoFn doesn't allow us to ask for these outside processElement, so this + // should be unreachable. + throw new UnsupportedOperationException( + "Can only get the window in processElement; elsewhere there is no defined window."); + } + + @Override + public Context context(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Can only get a ProcessContext in processElement"); + } + + @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Timers are not supported for OldDoFn"); + } + + @Override + public DoFn.InputProvider<InputT> inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver<OutputT> outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); + } + + @Override + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + throw new UnsupportedOperationException("This is a non-splittable DoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } + } + + /** + * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. + */ + private static class ProcessContextAdapter<InputT, OutputT> + extends DoFn<InputT, OutputT>.ProcessContext + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { + + private OldDoFn<InputT, OutputT>.ProcessContext context; + + private ProcessContextAdapter( + DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) { + fn.super(); + this.context = context; + } + + @Override + public PipelineOptions getPipelineOptions() { + return context.getPipelineOptions(); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + return context.sideInput(view); + } + + @Override + public void output(OutputT output) { + context.output(output); + } + + @Override + public void outputWithTimestamp(OutputT output, Instant timestamp) { + context.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + context.sideOutput(tag, output); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + context.sideOutputWithTimestamp(tag, output, timestamp); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( + String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { + return context.createAggregatorInternal(name, combiner); + } + + @Override + public InputT element() { + return context.element(); + } + + @Override + public Instant timestamp() { + return context.timestamp(); + } + + @Override + public PaneInfo pane() { + return context.pane(); + } + + @Override + public BoundedWindow window() { + return context.window(); + } + + @Override + public Context context(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + return this; + } + + @Override + public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); + } + + @Override + public DoFn.InputProvider<InputT> inputProvider() { + throw new UnsupportedOperationException("inputProvider() exists only for testing"); + } + + @Override + public DoFn.OutputReceiver<OutputT> outputReceiver() { + throw new UnsupportedOperationException("outputReceiver() exists only for testing"); + } + + @Override + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + throw new UnsupportedOperationException("This is a non-splittable DoFn"); + } + + @Override + public State state(String stateId) { + throw new UnsupportedOperationException("State is not supported by this runner"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("Timers are not supported by this runner"); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 1ff0212..9808e56 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -330,7 +330,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null"); return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); @@ -512,7 +512,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } @Override - protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT> + public <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT> createAggregatorInternal( String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) { return context.createAggregatorInternal(name, combiner); http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 97b67c6..ef01106 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -744,7 +744,7 @@ public class GroupAlsoByWindowsProperties { } @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( + public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index ed200d5..2a4a68e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index 7f6a436..a97bd46 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -18,10 +18,10 @@ package org.apache.beam.runners.flink.translation.functions; import java.util.Map; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index 6afca38..53b9803 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -252,7 +252,7 @@ abstract class FlinkProcessContextBase<InputT, OutputT> public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp); @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { @SuppressWarnings("unchecked") SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result = http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 057a3e7..95f2bfd 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.DoFnAdapters; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; @@ -42,7 +43,6 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnAdapters; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java index ce47e22..b1d3ead 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AggregatorRetriever.java @@ -18,9 +18,10 @@ package org.apache.beam.sdk.transforms; import java.util.Collection; +import java.util.Map; /** - * An internal class for extracting {@link Aggregator Aggregators} from {@link OldDoFn DoFns}. + * An internal class for extracting {@link Aggregator Aggregators} from {@link DoFn DoFns}. */ public final class AggregatorRetriever { private AggregatorRetriever() { @@ -28,9 +29,17 @@ public final class AggregatorRetriever { } /** - * Returns the {@link Aggregator Aggregators} created by the provided {@link OldDoFn}. + * Returns the {@link Aggregator Aggregators} created by the provided {@link DoFn}. */ public static Collection<Aggregator<?, ?>> getAggregators(DoFn<?, ?> fn) { return fn.getAggregators(); } + + /** + * Returns the {@link DelegatingAggregator delegating aggregators} created by the provided {@link + * DoFn}. + */ + public static Map<String, DelegatingAggregator<?, ?>> getDelegatingAggregators(DoFn<?, ?> fn) { + return fn.aggregators; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java deleted file mode 100644 index 0a71faa..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import java.io.IOException; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFn.Context; -import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; -import org.apache.beam.sdk.transforms.DoFn.ProcessContext; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.reflect.DoFnSignature; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}. - * - * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link - * DoFnInvoker}) rather than via {@link OldDoFn}. - */ -@Deprecated -public class DoFnAdapters { - /** Should not be instantiated. */ - private DoFnAdapters() {} - - /** Creates an {@link OldDoFn} that delegates to the {@link DoFn}. */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public static <InputT, OutputT> OldDoFn<InputT, OutputT> toOldDoFn(DoFn<InputT, OutputT> fn) { - DoFnSignature signature = DoFnSignatures.getSignature((Class) fn.getClass()); - if (signature.processElement().observesWindow()) { - return new WindowDoFnAdapter<>(fn); - } else { - return new SimpleDoFnAdapter<>(fn); - } - } - - /** - * Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link - * OldDoFn}. - */ - private static class SimpleDoFnAdapter<InputT, OutputT> extends OldDoFn<InputT, OutputT> { - private final DoFn<InputT, OutputT> fn; - private transient DoFnInvoker<InputT, OutputT> invoker; - - SimpleDoFnAdapter(DoFn<InputT, OutputT> fn) { - super(fn.aggregators); - this.fn = fn; - this.invoker = DoFnInvokers.invokerFor(fn); - } - - @Override - public void setup() throws Exception { - this.invoker.invokeSetup(); - } - - @Override - public void startBundle(Context c) throws Exception { - fn.prepareForProcessing(); - invoker.invokeStartBundle(new ContextAdapter<>(fn, c)); - } - - @Override - public void finishBundle(Context c) throws Exception { - invoker.invokeFinishBundle(new ContextAdapter<>(fn, c)); - } - - @Override - public void teardown() throws Exception { - this.invoker.invokeTeardown(); - } - - @Override - public void processElement(ProcessContext c) throws Exception { - ProcessContextAdapter<InputT, OutputT> adapter = new ProcessContextAdapter<>(fn, c); - invoker.invokeProcessElement(adapter); - } - - @Override - public Duration getAllowedTimestampSkew() { - return fn.getAllowedTimestampSkew(); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(fn); - } - - private void readObject(java.io.ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.invoker = DoFnInvokers.invokerFor(fn); - } - } - - /** Wraps a {@link DoFn} that requires access to {@link BoundedWindow} as an {@link OldDoFn}. */ - private static class WindowDoFnAdapter<InputT, OutputT> extends SimpleDoFnAdapter<InputT, OutputT> - implements OldDoFn.RequiresWindowAccess { - - WindowDoFnAdapter(DoFn<InputT, OutputT> fn) { - super(fn); - } - } - - /** - * Wraps an {@link OldDoFn.Context} as a {@link DoFnInvoker.ArgumentProvider} inside a {@link - * DoFn.StartBundle} or {@link DoFn.FinishBundle} method, which means the extra context is - * unavailable. - */ - private static class ContextAdapter<InputT, OutputT> extends DoFn<InputT, OutputT>.Context - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - private OldDoFn<InputT, OutputT>.Context context; - - private ContextAdapter(DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.Context context) { - fn.super(); - this.context = context; - super.setupDelegateAggregators(); - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public void output(OutputT output) { - context.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - context.sideOutput(tag, output); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( - String name, - CombineFn<AggInputT, ?, AggOutputT> combiner) { - return context.createAggregatorInternal(name, combiner); - } - - @Override - public BoundedWindow window() { - // The OldDoFn doesn't allow us to ask for these outside processElement, so this - // should be unreachable. - throw new UnsupportedOperationException( - "Can only get the window in processElement; elsewhere there is no defined window."); - } - - @Override - public Context context(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Can only get a ProcessContext in processElement"); - } - - @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Timers are not supported for OldDoFn"); - } - - @Override - public DoFn.InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("inputProvider() exists only for testing"); - } - - @Override - public DoFn.OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("outputReceiver() exists only for testing"); - } - - @Override - public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } - - /** - * Wraps an {@link OldDoFn.ProcessContext} as a {@link DoFnInvoker.ArgumentProvider} method. - */ - private static class ProcessContextAdapter<InputT, OutputT> - extends DoFn<InputT, OutputT>.ProcessContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - - private OldDoFn<InputT, OutputT>.ProcessContext context; - - private ProcessContextAdapter( - DoFn<InputT, OutputT> fn, OldDoFn<InputT, OutputT>.ProcessContext context) { - fn.super(); - this.context = context; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return context.sideInput(view); - } - - @Override - public void output(OutputT output) { - context.output(output); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - context.outputWithTimestamp(output, timestamp); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - context.sideOutput(tag, output); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.sideOutputWithTimestamp(tag, output, timestamp); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator( - String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - return context.createAggregatorInternal(name, combiner); - } - - @Override - public InputT element() { - return context.element(); - } - - @Override - public Instant timestamp() { - return context.timestamp(); - } - - @Override - public PaneInfo pane() { - return context.pane(); - } - - @Override - public BoundedWindow window() { - return context.window(); - } - - @Override - public Context context(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException("Timers are not supported for OldDoFn"); - } - - @Override - public DoFn.InputProvider<InputT> inputProvider() { - throw new UnsupportedOperationException("inputProvider() exists only for testing"); - } - - @Override - public DoFn.OutputReceiver<OutputT> outputReceiver() { - throw new UnsupportedOperationException("outputReceiver() exists only for testing"); - } - - @Override - public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { - throw new UnsupportedOperationException("This is a non-splittable DoFn"); - } - - @Override - public State state(String stateId) { - throw new UnsupportedOperationException("State is not supported by this runner"); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException("Timers are not supported by this runner"); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index 0aef552..7b04533 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -192,7 +192,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl * context */ @Experimental(Kind.AGGREGATOR) - protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + public abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner); /** http://git-wip-us.apache.org/repos/asf/beam/blob/149d52b5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java index 504480b..0db130d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java @@ -63,7 +63,7 @@ class NoOpOldDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> { Instant timestamp) { } @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { return null; }