[BEAM-854] Removes ReifyTimestampsAndWindows

It was used effectively in 2 places:
- GatherAllPanes (used in PAssert):
  replaced with a similar custom DoFn.
- A couple of GBK-related transforms:
  folded into GroupByKeyOnly.

Also makes GBKO produce globally-windowed KWIs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e77d881a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e77d881a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e77d881a

Branch: refs/heads/apex-runner
Commit: e77d881aecc3f5fdbc407a252f9924c1226bcb4a
Parents: 529f266
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Fri Oct 28 10:40:17 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 2 10:18:59 2016 -0700

----------------------------------------------------------------------
 .../core/GroupByKeyViaGroupByKeyOnly.java       | 13 ++--
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |  2 -
 .../beam/runners/direct/DirectGroupByKey.java   | 42 ++++++-------
 .../GroupAlsoByWindowEvaluatorFactory.java      |  4 +-
 .../direct/GroupByKeyOnlyEvaluatorFactory.java  | 31 +++++-----
 .../beam/runners/direct/ParDoEvaluator.java     |  5 +-
 .../direct/ParDoMultiEvaluatorHooks.java        |  1 +
 .../direct/ParDoSingleEvaluatorHooks.java       |  1 +
 .../direct/GroupByKeyEvaluatorFactoryTest.java  | 29 ++++-----
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     | 31 +++++-----
 .../beam/runners/direct/ParDoEvaluatorTest.java |  5 +-
 .../apache/beam/sdk/util/GatherAllPanes.java    | 18 +++++-
 .../sdk/util/ReifyTimestampAndWindowsDoFn.java  | 41 -------------
 .../sdk/util/ReifyTimestampsAndWindows.java     | 63 --------------------
 14 files changed, 94 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
index b521425..79d2252 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupByKeyViaGroupByKeyOnly.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -84,15 +83,11 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
     WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
 
     return input
-        // Make each input element's timestamp and assigned windows
-        // explicit, in the value part.
-        .apply(new ReifyTimestampsAndWindows<K, V>())
-
         // Group by just the key.
         // Combiner lifting will not happen regardless of the 
disallowCombinerLifting value.
         // There will be no combiners right after the GroupByKeyOnly because 
of the two ParDos
         // introduced in here.
-        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+        .apply(new GroupByKeyOnly<K, V>())
 
         // Sort each key's values by timestamp. GroupAlsoByWindow requires
         // its input to be sorted by timestamp.
@@ -112,12 +107,12 @@ public class GroupByKeyViaGroupByKeyOnly<K, V>
    * or evaluate this class.
    */
   public static class GroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
Iterable<WindowedValue<V>>>>> {
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> 
apply(PCollection<KV<K, V>> input) {
+      return PCollection.createPrimitiveOutputInternal(
           input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index b63e23b..680a971 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -52,7 +51,6 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
       checkArgument(input.getCoder() instanceof KvCoder);
       KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
       return input
-          .apply(new ReifyTimestampsAndWindows<KeyT, InputT>())
           // TODO: Perhaps windowing strategy should instead be set by 
ReifyTAW, or by DGBKO
           .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
           .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
index 14103a6..219314a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKey.java
@@ -27,8 +27,6 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -55,13 +53,13 @@ class DirectGroupByKey<K, V>
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+    WindowingStrategy<?, ?> inputWindowingStrategy = 
input.getWindowingStrategy();
+    // Update the windowing strategy as appropriate.
+    WindowingStrategy<?, ?> outputWindowingStrategy =
+        original.updateWindowingStrategy(inputWindowingStrategy);
 
     // By default, implement GroupByKey via a series of lower-level operations.
     return input
-        // Make each input element's timestamp and assigned windows
-        // explicit, in the value part.
-        .apply(new ReifyTimestampsAndWindows<K, V>())
         .apply(new DirectGroupByKeyOnly<K, V>())
         .setCoder(
             KeyedWorkItemCoder.of(
@@ -70,20 +68,20 @@ class DirectGroupByKey<K, V>
                 input.getWindowingStrategy().getWindowFn().windowCoder()))
 
         // Group each key's values by window, merging windows as needed.
-        .apply("GroupAlsoByWindow", new DirectGroupAlsoByWindow<K, 
V>(windowingStrategy))
+        .apply(
+            "GroupAlsoByWindow",
+            new DirectGroupAlsoByWindow<K, V>(inputWindowingStrategy, 
outputWindowingStrategy))
 
-        // And update the windowing strategy as appropriate.
-        
.setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
         .setCoder(
             KvCoder.of(inputCoder.getKeyCoder(), 
IterableCoder.of(inputCoder.getValueCoder())));
   }
 
   static final class DirectGroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, 
PCollection<KeyedWorkItem<K, V>>> {
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KeyedWorkItem<K, 
V>>> {
     @Override
-    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, 
WindowedValue<V>>> input) {
-      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
+    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, V>> input) 
{
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), WindowingStrategy.globalDefault(), 
input.isBounded());
     }
 
     DirectGroupByKeyOnly() {}
@@ -92,14 +90,18 @@ class DirectGroupByKey<K, V>
   static final class DirectGroupAlsoByWindow<K, V>
       extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, 
Iterable<V>>>> {
 
-    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final WindowingStrategy<?, ?> inputWindowingStrategy;
+    private final WindowingStrategy<?, ?> outputWindowingStrategy;
 
-    public DirectGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
+    public DirectGroupAlsoByWindow(
+        WindowingStrategy<?, ?> inputWindowingStrategy,
+        WindowingStrategy<?, ?> outputWindowingStrategy) {
+      this.inputWindowingStrategy = inputWindowingStrategy;
+      this.outputWindowingStrategy = outputWindowingStrategy;
     }
 
-    public WindowingStrategy<?, ?> getWindowingStrategy() {
-      return windowingStrategy;
+    public WindowingStrategy<?, ?> getInputWindowingStrategy() {
+      return inputWindowingStrategy;
     }
 
     private KeyedWorkItemCoder<K, V> 
getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
@@ -125,8 +127,8 @@ class DirectGroupByKey<K, V>
 
     @Override
     public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, 
V>> input) {
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), 
input.isBounded());
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), outputWindowingStrategy, input.isBounded());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 4115bb7..37cc319 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -103,7 +103,8 @@ class GroupAlsoByWindowEvaluatorFactory implements 
TransformEvaluatorFactory {
 
       @SuppressWarnings("unchecked")
       WindowingStrategy<?, BoundedWindow> windowingStrategy =
-          (WindowingStrategy<?, BoundedWindow>) 
application.getTransform().getWindowingStrategy();
+          (WindowingStrategy<?, BoundedWindow>)
+              application.getTransform().getInputWindowingStrategy();
 
       DirectStepContext stepContext =
           evaluationContext
@@ -125,6 +126,7 @@ class GroupAlsoByWindowEvaluatorFactory implements 
TransformEvaluatorFactory {
               evaluationContext,
               stepContext,
               application,
+              windowingStrategy,
               gabwDoFn,
               Collections.<PCollectionView<?>>emptyList(),
               MAIN_OUTPUT_TAG,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
index 2ead782..0fa7ebd 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java
@@ -67,14 +67,14 @@ class GroupByKeyOnlyEvaluatorFactory implements 
TransformEvaluatorFactory {
   @Override
   public void cleanup() {}
 
-  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
+  private <K, V> TransformEvaluator<KV<K, V>> createEvaluator(
       final AppliedPTransform<
-          PCollection<KV<K, WindowedValue<V>>>,
+          PCollection<KV<K, V>>,
           PCollection<KeyedWorkItem<K, V>>,
           DirectGroupByKeyOnly<K, V>>
           application,
-      final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle) {
-    return new GroupByKeyOnlyEvaluator<>(evaluationContext, inputBundle, 
application);
+      final CommittedBundle<KV<K, V>> inputBundle) {
+    return new GroupByKeyOnlyEvaluator<>(evaluationContext, application);
   }
 
   /**
@@ -84,12 +84,11 @@ class GroupByKeyOnlyEvaluatorFactory implements 
TransformEvaluatorFactory {
    * @see GroupByKeyViaGroupByKeyOnly
    */
   private static class GroupByKeyOnlyEvaluator<K, V>
-      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
+      implements TransformEvaluator<KV<K, V>> {
     private final EvaluationContext evaluationContext;
 
-    private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle;
     private final AppliedPTransform<
-            PCollection<KV<K, WindowedValue<V>>>,
+            PCollection<KV<K, V>>,
             PCollection<KeyedWorkItem<K, V>>,
             DirectGroupByKeyOnly<K, V>> application;
     private final Coder<K> keyCoder;
@@ -97,19 +96,17 @@ class GroupByKeyOnlyEvaluatorFactory implements 
TransformEvaluatorFactory {
 
     public GroupByKeyOnlyEvaluator(
         EvaluationContext evaluationContext,
-        CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
         AppliedPTransform<
-                PCollection<KV<K, WindowedValue<V>>>,
-                PCollection<KeyedWorkItem<K, V>>,
+            PCollection<KV<K, V>>,
+            PCollection<KeyedWorkItem<K, V>>,
             DirectGroupByKeyOnly<K, V>> application) {
       this.evaluationContext = evaluationContext;
-      this.inputBundle = inputBundle;
       this.application = application;
       this.keyCoder = getKeyCoder(application.getInput().getCoder());
       this.groupingMap = new HashMap<>();
     }
 
-    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
+    private Coder<K> getKeyCoder(Coder<KV<K, V>> coder) {
       checkState(
           coder instanceof KvCoder,
           "%s requires a coder of class %s."
@@ -118,13 +115,13 @@ class GroupByKeyOnlyEvaluatorFactory implements 
TransformEvaluatorFactory {
           getClass().getSimpleName(),
           KvCoder.class.getSimpleName());
       @SuppressWarnings("unchecked")
-      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
+      Coder<K> keyCoder = ((KvCoder<K, V>) coder).getKeyCoder();
       return keyCoder;
     }
 
     @Override
-    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) 
{
-      KV<K, WindowedValue<V>> kv = element.getValue();
+    public void processElement(WindowedValue<KV<K, V>> element) {
+      KV<K, V> kv = element.getValue();
       K key = kv.getKey();
       byte[] encodedKey;
       try {
@@ -139,10 +136,10 @@ class GroupByKeyOnlyEvaluatorFactory implements 
TransformEvaluatorFactory {
       GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
       List<WindowedValue<V>> values = groupingMap.get(groupingKey);
       if (values == null) {
-        values = new ArrayList<WindowedValue<V>>();
+        values = new ArrayList<>();
         groupingMap.put(groupingKey, values);
       }
-      values.add(kv.getValue());
+      values.add(element.withValue(kv.getValue()));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 5913379..6f91319 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -31,9 +31,11 @@ import 
org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -44,6 +46,7 @@ class ParDoEvaluator<InputT, OutputT> implements 
TransformEvaluator<InputT> {
       EvaluationContext evaluationContext,
       DirectStepContext stepContext,
       AppliedPTransform<PCollection<InputT>, ?, ?> application,
+      WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
       Serializable fn, // may be OldDoFn or DoFn
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
@@ -70,7 +73,7 @@ class ParDoEvaluator<InputT, OutputT> implements 
TransformEvaluator<InputT> {
             sideOutputTags,
             stepContext,
             aggregatorChanges,
-            application.getInput().getWindowingStrategy());
+            windowingStrategy);
     PushbackSideInputDoFnRunner<InputT, OutputT> runner =
         PushbackSideInputDoFnRunner.create(underlying, sideInputs, 
sideInputReader);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
index a566154..f30f209 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorHooks.java
@@ -45,6 +45,7 @@ class ParDoMultiEvaluatorHooks<InputT, OutputT>
         evaluationContext,
         stepContext,
         application,
+        application.getInput().getWindowingStrategy(),
         fnLocal,
         transform.getSideInputs(),
         transform.getMainOutputTag(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
index b554f41..6d284c2 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorHooks.java
@@ -48,6 +48,7 @@ class ParDoSingleEvaluatorHooks<InputT, OutputT>
         evaluationContext,
         stepContext,
         application,
+        application.getInput().getWindowingStrategy(),
         fnLocal,
         transform.getSideInputs(),
         mainOutputTag,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 49d7d90..a726817 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -64,13 +63,11 @@ public class GroupByKeyEvaluatorFactoryTest {
     KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
     PCollection<KV<String, Integer>> values =
         p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, 
thirdFoo));
-    PCollection<KV<String, WindowedValue<Integer>>> kvs =
-        values.apply(new ReifyTimestampsAndWindows<String, Integer>());
     PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
-        kvs.apply(new DirectGroupByKeyOnly<String, Integer>());
+        values.apply(new DirectGroupByKeyOnly<String, Integer>());
 
-    CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
-        bundleFactory.createBundle(kvs).commit(Instant.now());
+    CommittedBundle<KV<String, Integer>> inputBundle =
+        bundleFactory.createBundle(values).commit(Instant.now());
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
     StructuralKey<String> fooKey = StructuralKey.of("foo", 
StringUtf8Coder.of());
     UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
@@ -97,17 +94,17 @@ public class GroupByKeyEvaluatorFactoryTest {
     // The input to a GroupByKey is assumed to be a KvCoder
     @SuppressWarnings("unchecked")
     Coder<String> keyCoder =
-        ((KvCoder<String, WindowedValue<Integer>>) 
kvs.getCoder()).getKeyCoder();
-    TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
+        ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
+    TransformEvaluator<KV<String, Integer>> evaluator =
         new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
             .forApplication(groupedKvs.getProducingTransformInternal(), 
inputBundle);
 
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz));
 
     evaluator.finishBundle();
 
@@ -142,10 +139,6 @@ public class GroupByKeyEvaluatorFactoryTest {
                 keyCoder)));
   }
 
-  private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
-    return KV.of(kv.getKey(), 
WindowedValue.valueInGlobalWindow(kv.getValue()));
-  }
-
   private static class KeyedWorkItemMatcher<K, V>
       extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
     private final KeyedWorkItem<K, V> myWorkItem;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 3b9dc39..3e5af14 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -64,13 +63,11 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
     KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
     PCollection<KV<String, Integer>> values =
         p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, 
thirdFoo));
-    PCollection<KV<String, WindowedValue<Integer>>> kvs =
-        values.apply(new ReifyTimestampsAndWindows<String, Integer>());
     PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
-        kvs.apply(new DirectGroupByKeyOnly<String, Integer>());
+        values.apply(new DirectGroupByKeyOnly<String, Integer>());
 
-    CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
-        bundleFactory.createBundle(kvs).commit(Instant.now());
+    CommittedBundle<KV<String, Integer>> inputBundle =
+        bundleFactory.createBundle(values).commit(Instant.now());
     EvaluationContext evaluationContext = mock(EvaluationContext.class);
 
     StructuralKey<String> fooKey = StructuralKey.of("foo", 
StringUtf8Coder.of());
@@ -90,25 +87,25 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
     // The input to a GroupByKey is assumed to be a KvCoder
     @SuppressWarnings("unchecked")
     Coder<String> keyCoder =
-        ((KvCoder<String, WindowedValue<Integer>>) 
kvs.getCoder()).getKeyCoder();
-    TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
+        ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
+    TransformEvaluator<KV<String, Integer>> evaluator =
         new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
             .forApplication(
                 groupedKvs.getProducingTransformInternal(), inputBundle);
 
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
-    
evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(thirdFoo));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBar));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(secondBar));
+    evaluator.processElement(WindowedValue.valueInGlobalWindow(firstBaz));
 
     evaluator.finishBundle();
 
     assertThat(
         fooBundle.commit(Instant.now()).getElements(),
         contains(
-            new KeyedWorkItemMatcher<String, Integer>(
+            new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
                     "foo",
                     ImmutableSet.of(
@@ -119,7 +116,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
     assertThat(
         barBundle.commit(Instant.now()).getElements(),
         contains(
-            new KeyedWorkItemMatcher<String, Integer>(
+            new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
                     "bar",
                     ImmutableSet.of(
@@ -129,7 +126,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
     assertThat(
         bazBundle.commit(Instant.now()).getElements(),
         contains(
-            new KeyedWorkItemMatcher<String, Integer>(
+            new KeyedWorkItemMatcher<>(
                 KeyedWorkItems.elementsWorkItem(
                     "baz",
                     
ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 8254413..eab92f4 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -152,10 +152,13 @@ public class ParDoEvaluatorTest {
     when(evaluationContext.getAggregatorContainer()).thenReturn(container);
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
+    AppliedPTransform<PCollection<Integer>, ?, ?> transform =
+        (AppliedPTransform<PCollection<Integer>, ?, ?>) 
output.getProducingTransformInternal();
     return ParDoEvaluator.create(
         evaluationContext,
         stepContext,
-        (AppliedPTransform<PCollection<Integer>, ?, ?>) 
output.getProducingTransformInternal(),
+        transform,
+        transform.getInput().getWindowingStrategy(),
         fn,
         ImmutableList.<PCollectionView<?>>of(singletonView),
         mainOutputTag,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index a2a6e17..52a2ba8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.util;
 
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Never;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -55,8 +58,12 @@ public class GatherAllPanes<T>
     WindowFn<?, ?> originalWindowFn = 
input.getWindowingStrategy().getWindowFn();
 
     return input
-        .apply(WithKeys.<Integer, T>of(0).withKeyType(new 
TypeDescriptor<Integer>() {}))
-        .apply(new ReifyTimestampsAndWindows<Integer, T>())
+        .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
+        .setCoder(
+            WindowedValue.FullWindowedValueCoder.of(
+                input.getCoder(), 
input.getWindowingStrategy().getWindowFn().windowCoder()))
+        .apply(
+            WithKeys.<Integer, WindowedValue<T>>of(0).withKeyType(new 
TypeDescriptor<Integer>() {}))
         .apply(
             Window.into(
                     new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
@@ -69,4 +76,11 @@ public class GatherAllPanes<T>
         .apply(Values.<Iterable<WindowedValue<T>>>create())
         .setWindowingStrategyInternal(input.getWindowingStrategy());
   }
+
+  private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, 
WindowedValue<T>> {
+    @DoFn.ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      c.output(WindowedValue.of(c.element(), c.timestamp(), window, c.pane()));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
deleted file mode 100644
index 6da4da0..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
-import org.apache.beam.sdk.values.KV;
-
-/**
- * {@link OldDoFn} that makes timestamps and window assignments explicit in 
the value part of each
- * key/value pair.
- *
- * @param <K> the type of the keys of the input and output {@code PCollection}s
- * @param <V> the type of the values of the input {@code PCollection}
- */
-@SystemDoFnInternal
-public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, 
KV<K, WindowedValue<V>>>
-    implements RequiresWindowAccess {
-  @Override
-  public void processElement(ProcessContext c) throws Exception {
-    KV<K, V> kv = c.element();
-    K key = kv.getKey();
-    V value = kv.getValue();
-    c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), 
c.pane())));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e77d881a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
deleted file mode 100644
index d129c8e..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampsAndWindows.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-
-/**
- * Helper transform that makes timestamps and window assignments explicit in 
the value part of
- * each key/value pair.
- */
-public class ReifyTimestampsAndWindows<K, V>
-    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, 
WindowedValue<V>>>> {
-
-  @Override
-  public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> 
input) {
-
-    // The requirement to use a KvCoder *is* actually a model-level 
requirement, not specific
-    // to this implementation of GBK. All runners need a way to get the key.
-    checkArgument(
-        input.getCoder() instanceof KvCoder,
-        "%s requires its input to use a %s",
-        GroupByKey.class.getSimpleName(),
-        KvCoder.class.getSimpleName());
-
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
-    Coder<WindowedValue<V>> outputValueCoder =
-        FullWindowedValueCoder.of(
-            inputValueCoder, 
input.getWindowingStrategy().getWindowFn().windowCoder());
-    Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, 
outputValueCoder);
-    return input
-        .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
-        .setCoder(outputKvCoder);
-  }
-}


Reply via email to