Revert "Removes ArgumentProvider.windowingInternals" This reverts commit f3e8a0383bf9cb3f9452e0364f7deba113cadff9.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4aa0ee14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4aa0ee14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4aa0ee14 Branch: refs/heads/master Commit: 4aa0ee1436a8d94f7c1c75bd0151790d14635c64 Parents: a12fd8c Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Dec 16 15:26:32 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri Dec 16 16:39:20 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/runners/core/DoFnAdapters.java | 14 +++++ .../beam/runners/core/SimpleDoFnRunner.java | 57 ++++++++++++++++++++ .../beam/runners/core/SplittableParDo.java | 7 +++ .../apache/beam/sdk/transforms/DoFnTester.java | 7 +++ .../sdk/transforms/reflect/DoFnInvoker.java | 20 +++++++ .../transforms/reflect/DoFnInvokersTest.java | 6 +++ 6 files changed, 111 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4aa0ee14/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index a4002da..fc5847c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -355,6 +356,14 @@ public class DoFnAdapters { } @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this + // should be unreachable. + throw new UnsupportedOperationException( + "Can only get WindowingInternals in processElement"); + } + + @Override public DoFn.InputProvider<InputT> inputProvider() { throw new UnsupportedOperationException("inputProvider() exists only for testing"); } @@ -467,6 +476,11 @@ public class DoFnAdapters { } @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return context.windowingInternals(); + } + + @Override public DoFn.InputProvider<InputT> inputProvider() { throw new UnsupportedOperationException("inputProvider() exists only for testing"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4aa0ee14/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index d504b40..29ef3ef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -51,10 +51,13 @@ import org.apache.beam.sdk.util.ExecutionContext.StepContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateSpec; @@ -416,6 +419,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + throw new UnsupportedOperationException("WindowingInternals are unsupported."); + } + + @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException( "Cannot access RestrictionTracker outside of @ProcessElement method."); @@ -625,5 +633,54 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out throw new UnsupportedOperationException("Timer parameters are not supported."); } + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return new WindowingInternals<InputT, OutputT>() { + @Override + public Collection<? extends BoundedWindow> windows() { + return windowedValue.getWindows(); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public TimerInternals timerInternals() { + return context.stepContext.timerInternals(); + } + + @Override + public StateInternals<?> stateInternals() { + return stepContext.stateInternals(); + } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException("A DoFn cannot output to a different window"); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException( + "A DoFn cannot side output to a different window"); + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) { + return context.sideInput(view, sideInputWindow); + } + }; + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4aa0ee14/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index f8d12ec..e6a2466 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateInternals; @@ -684,6 +685,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + // DoFnSignatures should have verified that this DoFn doesn't access extra context. + throw new IllegalStateException("Unexpected extra context access on a splittable DoFn"); + } + + @Override public TrackerT restrictionTracker() { return tracker; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4aa0ee14/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index b2c3fd5..2d8684a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -48,6 +48,7 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; @@ -327,6 +328,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + throw new UnsupportedOperationException( + "Not expected to access WindowingInternals from a new DoFn"); + } + + @Override public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { throw new UnsupportedOperationException( "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester"); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4aa0ee14/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 354578e..97ac9d3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -27,9 +27,11 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.DoFn.StartBundle; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.State; /** @@ -120,6 +122,19 @@ public interface DoFnInvoker<InputT, OutputT> { OutputReceiver<OutputT> outputReceiver(); /** + * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so + * an {@link OldDoFn} can be run via {@link DoFnInvoker}. + * + * <p>This is <i>not</i> exposed via the reflective capabilities of {@link DoFn}. + * + * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state + * and timers, they will need to wait for the arrival of those features. Do not introduce + * new uses of this method. + */ + @Deprecated + WindowingInternals<InputT, OutputT> windowingInternals(); + + /** * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with * the current {@link ProcessElement} call. */ @@ -165,6 +180,11 @@ public interface DoFnInvoker<InputT, OutputT> { } @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + return null; + } + + @Override public State state(String stateId) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4aa0ee14/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index 4233b39..456a6eb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider; import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -51,6 +52,7 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; @@ -76,14 +78,18 @@ public class DoFnInvokersTest { @Mock private IntervalWindow mockWindow; @Mock private DoFn.InputProvider<String> mockInputProvider; @Mock private DoFn.OutputReceiver<String> mockOutputReceiver; + @Mock private WindowingInternals<String, String> mockWindowingInternals; @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider; + @Mock private OldDoFn<String, String> mockOldDoFn; + @Before public void setUp() { MockitoAnnotations.initMocks(this); when(mockArgumentProvider.window()).thenReturn(mockWindow); when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider); when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver); + when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals); when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext); }