Move WindowingInternals to runners/core-java

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/949ab3ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/949ab3ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/949ab3ac

Branch: refs/heads/master
Commit: 949ab3ac6d654a310a513d2e64e8dbf39fd4f388
Parents: b12e5ff
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jan 26 21:06:10 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Feb 6 09:26:06 2017 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |  2 +-
 .../org/apache/beam/runners/core/OldDoFn.java   |  1 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |  1 -
 .../beam/runners/core/WindowingInternals.java   | 83 ++++++++++++++++++++
 .../core/WindowingInternalsAdapters.java        |  1 -
 .../core/GroupAlsoByWindowsProperties.java      |  1 -
 .../beam/runners/core/ReduceFnTester.java       |  1 -
 .../functions/FlinkProcessContextBase.java      |  2 +-
 .../beam/sdk/util/WindowingInternals.java       | 82 -------------------
 9 files changed, 85 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 4c2b461..c5da368 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -46,6 +46,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.WindowingInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -59,7 +60,6 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
index b099721..4033260 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 2fe9226..9f80bca 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
new file mode 100644
index 0000000..b8425b7
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * Interface that may be required by some (internal) {@link DoFn}s to 
implement windowing. It
+ * should not be necessary for general user code to interact with this at all.
+ *
+ * <p>This interface should be provided by runner implementors to support 
windowing on their runner.
+ *
+ * @param <InputT> input type
+ * @param <OutputT> output type
+ */
+public interface WindowingInternals<InputT, OutputT> {
+
+  /**
+   * Unsupported state internals. The key type is unknown. It is up to the 
user to use the
+   * correct type of key.
+   */
+  StateInternals<?> stateInternals();
+
+  /**
+   * Output the value at the specified timestamp in the listed windows.
+   */
+  void outputWindowedValue(OutputT output, Instant timestamp,
+      Collection<? extends BoundedWindow> windows, PaneInfo pane);
+
+  /**
+   * Output the value to a side output at the specified timestamp in the 
listed windows.
+   */
+  <SideOutputT> void sideOutputWindowedValue(
+      TupleTag<SideOutputT> tag,
+      SideOutputT output,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane);
+
+  /**
+   * Return the timer manager provided by the underlying system, or null if 
Timers need
+   * to be emulated.
+   */
+  TimerInternals timerInternals();
+
+  /**
+   * Access the windows the element is being processed in without "exploding" 
it.
+   */
+  Collection<? extends BoundedWindow> windows();
+
+  /**
+   * Access the pane of the current window(s).
+   */
+  PaneInfo pane();
+
+  /**
+   * Return the value of the side input for a particular side input window.
+   */
+  <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
index 7f80844..48a39d6 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternalsAdapters.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index fedc4ca..98063df 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index d396a08..4f4baac 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -70,7 +70,6 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.util.state.StateNamespace;

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index e955679..cedad38 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.WindowingInternals;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -33,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/949ab3ac/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
deleted file mode 100644
index a921725..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.joda.time.Instant;
-
-/**
- * Interface that may be required by some (internal) {@link DoFn}s to 
implement windowing. It
- * should not be necessary for general user code to interact with this at all.
- *
- * <p>This interface should be provided by runner implementors to support 
windowing on their runner.
- *
- * @param <InputT> input type
- * @param <OutputT> output type
- */
-public interface WindowingInternals<InputT, OutputT> {
-
-  /**
-   * Unsupported state internals. The key type is unknown. It is up to the 
user to use the
-   * correct type of key.
-   */
-  StateInternals<?> stateInternals();
-
-  /**
-   * Output the value at the specified timestamp in the listed windows.
-   */
-  void outputWindowedValue(OutputT output, Instant timestamp,
-      Collection<? extends BoundedWindow> windows, PaneInfo pane);
-
-  /**
-   * Output the value to a side output at the specified timestamp in the 
listed windows.
-   */
-  <SideOutputT> void sideOutputWindowedValue(
-      TupleTag<SideOutputT> tag,
-      SideOutputT output,
-      Instant timestamp,
-      Collection<? extends BoundedWindow> windows,
-      PaneInfo pane);
-
-  /**
-   * Return the timer manager provided by the underlying system, or null if 
Timers need
-   * to be emulated.
-   */
-  TimerInternals timerInternals();
-
-  /**
-   * Access the windows the element is being processed in without "exploding" 
it.
-   */
-  Collection<? extends BoundedWindow> windows();
-
-  /**
-   * Access the pane of the current window(s).
-   */
-  PaneInfo pane();
-
-  /**
-   * Return the value of the side input for a particular side input window.
-   */
-  <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow);
-}

Reply via email to