Move WindowingInternals to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/949ab3ac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/949ab3ac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/949ab3ac Branch: refs/heads/master Commit: 949ab3ac6d654a310a513d2e64e8dbf39fd4f388 Parents: b12e5ff Author: Kenneth Knowles <k...@google.com> Authored: Thu Jan 26 21:06:10 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 09:26:06 2017 -0800 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 2 +- .../org/apache/beam/runners/core/OldDoFn.java | 1 - .../beam/runners/core/SimpleOldDoFnRunner.java | 1 - .../beam/runners/core/WindowingInternals.java | 83 ++++++++++++++++++++ .../core/WindowingInternalsAdapters.java | 1 - .../core/GroupAlsoByWindowsProperties.java | 1 - .../beam/runners/core/ReduceFnTester.java | 1 - .../functions/FlinkProcessContextBase.java | 2 +- .../beam/sdk/util/WindowingInternals.java | 82 ------------------- 9 files changed, 85 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 4c2b461..c5da368 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.WindowingInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; @@ -59,7 +60,6 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java index b099721..4033260 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 2fe9226..9f80bca 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java new file mode 100644 index 0000000..b8425b7 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Collection; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; + +/** + * Interface that may be required by some (internal) {@link DoFn}s to implement windowing. It + * should not be necessary for general user code to interact with this at all. + * + * <p>This interface should be provided by runner implementors to support windowing on their runner. + * + * @param <InputT> input type + * @param <OutputT> output type + */ +public interface WindowingInternals<InputT, OutputT> { + + /** + * Unsupported state internals. The key type is unknown. It is up to the user to use the + * correct type of key. + */ + StateInternals<?> stateInternals(); + + /** + * Output the value at the specified timestamp in the listed windows. + */ + void outputWindowedValue(OutputT output, Instant timestamp, + Collection<? extends BoundedWindow> windows, PaneInfo pane); + + /** + * Output the value to a side output at the specified timestamp in the listed windows. + */ + <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane); + + /** + * Return the timer manager provided by the underlying system, or null if Timers need + * to be emulated. + */ + TimerInternals timerInternals(); + + /** + * Access the windows the element is being processed in without "exploding" it. + */ + Collection<? extends BoundedWindow> windows(); + + /** + * Access the pane of the current window(s). + */ + PaneInfo pane(); + + /** + * Return the value of the side input for a particular side input window. + */ + <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow); +} http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java index 7f80844..48a39d6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java @@ -21,7 +21,6 @@ import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index fedc4ca..98063df 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -49,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index d396a08..4f4baac 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -70,7 +70,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.util.state.StateNamespace; http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index e955679..cedad38 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.runners.core.WindowingInternals; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; @@ -33,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java deleted file mode 100644 index a921725..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.util.Collection; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; - -/** - * Interface that may be required by some (internal) {@link DoFn}s to implement windowing. It - * should not be necessary for general user code to interact with this at all. - * - * <p>This interface should be provided by runner implementors to support windowing on their runner. - * - * @param <InputT> input type - * @param <OutputT> output type - */ -public interface WindowingInternals<InputT, OutputT> { - - /** - * Unsupported state internals. The key type is unknown. It is up to the user to use the - * correct type of key. - */ - StateInternals<?> stateInternals(); - - /** - * Output the value at the specified timestamp in the listed windows. - */ - void outputWindowedValue(OutputT output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane); - - /** - * Output the value to a side output at the specified timestamp in the listed windows. - */ - <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, - SideOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane); - - /** - * Return the timer manager provided by the underlying system, or null if Timers need - * to be emulated. - */ - TimerInternals timerInternals(); - - /** - * Access the windows the element is being processed in without "exploding" it. - */ - Collection<? extends BoundedWindow> windows(); - - /** - * Access the pane of the current window(s). - */ - PaneInfo pane(); - - /** - * Return the value of the side input for a particular side input window. - */ - <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow); -}