Do not Reassign Windows when WindowFn is null

Adjusting the Windowing Strategy should not change any elements of the
data. This is also potentially type-unsafe, as the upstream WindowFn may
only take elements of a type which is not the input element of the
downstream PTransform.

Introduce Window.Assign, which replaces Window.Bound as the primitive to
"assign elements to windows based on the WindowFn". This converts
Window.Bound into a composite in all cases.

Use a Flatten to improve performance on many runners, without needing an
opaque DoFn.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf9b9b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf9b9b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf9b9b3

Branch: refs/heads/master
Commit: eaf9b9b36dec1cc421335b27f225663ce42d0cca
Parents: ca678d8
Author: Thomas Groh <tg...@google.com>
Authored: Fri Feb 24 11:29:42 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 1 17:51:19 2017 -0800

----------------------------------------------------------------------
 .../translation/ApexPipelineTranslator.java     |   2 +-
 .../translation/WindowAssignTranslator.java     |  78 +++++++++++
 .../apex/translation/WindowBoundTranslator.java |  78 -----------
 .../direct/TransformEvaluatorRegistry.java      |   2 +-
 .../runners/direct/WindowEvaluatorFactory.java  |  11 +-
 .../direct/WindowEvaluatorFactoryTest.java      |  46 +------
 .../flink/FlinkBatchTransformTranslators.java   |   8 +-
 .../FlinkStreamingTransformTranslators.java     |   8 +-
 .../functions/FlinkAssignWindows.java           |   2 +-
 .../dataflow/DataflowPipelineTranslator.java    |   9 +-
 .../spark/translation/TransformTranslator.java  |   8 +-
 .../spark/translation/TranslationUtils.java     |   4 +-
 .../streaming/StreamingTransformTranslator.java |   8 +-
 .../beam/sdk/transforms/windowing/Window.java   |  43 +++++-
 .../sdk/transforms/windowing/WindowTest.java    | 136 +++++++++++++++++++
 15 files changed, 290 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index e9d6571..951a286 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -71,7 +71,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
         new CreateApexPCollectionViewTranslator());
     registerTransformTranslator(CreatePCollectionView.class,
         new CreatePCollectionViewTranslator());
-    registerTransformTranslator(Window.Bound.class, new 
WindowBoundTranslator());
+    registerTransformTranslator(Window.Assign.class, new 
WindowAssignTranslator());
   }
 
   public ApexPipelineTranslator(ApexPipelineOptions options) {

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
new file mode 100644
index 0000000..b3aef8d
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.translation;
+
+import java.util.Collections;
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
+import org.apache.beam.runners.core.AssignWindowsDoFn;
+import org.apache.beam.runners.core.DoFnAdapters;
+import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps 
an {@link
+ * AssignWindowsDoFn}.
+ */
+class WindowAssignTranslator<T> implements 
TransformTranslator<Window.Assign<T>> {
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public void translate(Window.Assign<T> transform, TranslationContext 
context) {
+    PCollection<T> output = (PCollection<T>) context.getOutput();
+    PCollection<T> input = (PCollection<T>) context.getInput();
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<T, BoundedWindow> windowingStrategy =
+        (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
+
+    OldDoFn<T, T> fn =
+        (transform.getWindowFn() == null)
+            ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
+            : new AssignWindowsDoFn<>(transform.getWindowFn());
+
+    ApexParDoOperator<T, T> operator =
+        new ApexParDoOperator<T, T>(
+            context.getPipelineOptions().as(ApexPipelineOptions.class),
+            fn,
+            new TupleTag<T>(),
+            TupleTagList.empty().getAll(),
+            windowingStrategy,
+            Collections.<PCollectionView<?>>emptyList(),
+            WindowedValue.getFullCoder(
+                input.getCoder(), 
windowingStrategy.getWindowFn().windowCoder()),
+            context.<Void>stateInternalsFactory());
+    context.addOperator(operator, operator.output);
+    context.addStream(context.getInput(), operator.input);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
deleted file mode 100644
index a241cad..0000000
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.apex.translation;
-
-import java.util.Collections;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator;
-import org.apache.beam.runners.core.AssignWindowsDoFn;
-import org.apache.beam.runners.core.DoFnAdapters;
-import org.apache.beam.runners.core.OldDoFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-/**
- * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps 
an {@link
- * AssignWindowsDoFn}.
- */
-class WindowBoundTranslator<T> implements TransformTranslator<Window.Bound<T>> 
{
-  private static final long serialVersionUID = 1L;
-
-  @Override
-  public void translate(Window.Bound<T> transform, TranslationContext context) 
{
-    PCollection<T> output = (PCollection<T>) context.getOutput();
-    PCollection<T> input = (PCollection<T>) context.getInput();
-    @SuppressWarnings("unchecked")
-    WindowingStrategy<T, BoundedWindow> windowingStrategy =
-        (WindowingStrategy<T, BoundedWindow>) output.getWindowingStrategy();
-
-    OldDoFn<T, T> fn =
-        (transform.getWindowFn() == null)
-            ? DoFnAdapters.toOldDoFn(new IdentityFn<T>())
-            : new AssignWindowsDoFn<>(transform.getWindowFn());
-
-    ApexParDoOperator<T, T> operator =
-        new ApexParDoOperator<T, T>(
-            context.getPipelineOptions().as(ApexPipelineOptions.class),
-            fn,
-            new TupleTag<T>(),
-            TupleTagList.empty().getAll(),
-            windowingStrategy,
-            Collections.<PCollectionView<?>>emptyList(),
-            WindowedValue.getFullCoder(
-                input.getCoder(), 
windowingStrategy.getWindowFn().windowCoder()),
-            context.<Void>stateInternalsFactory());
-    context.addOperator(operator, operator.output);
-    context.addStream(context.getInput(), operator.input);
-  }
-
-  private static class IdentityFn<T> extends DoFn<T, T> {
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      c.output(c.element());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 9fdefc3..62fee53 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -55,7 +55,7 @@ class TransformEvaluatorRegistry implements 
TransformEvaluatorFactory {
             .put(StatefulParDo.class, new 
StatefulParDoEvaluatorFactory<>(ctxt))
             .put(PCollections.class, new FlattenEvaluatorFactory(ctxt))
             .put(ViewEvaluatorFactory.WriteView.class, new 
ViewEvaluatorFactory(ctxt))
-            .put(Window.Bound.class, new WindowEvaluatorFactory(ctxt))
+            .put(Window.Assign.class, new WindowEvaluatorFactory(ctxt))
             // Runner-specific primitives used in expansion of GroupByKey
             .put(DirectGroupByKeyOnly.class, new 
GroupByKeyOnlyEvaluatorFactory(ctxt))
             .put(DirectGroupAlsoByWindow.class, new 
GroupAlsoByWindowEvaluatorFactory(ctxt))

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 3cf178c..8974c67 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -26,7 +26,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -34,7 +33,7 @@ import org.joda.time.Instant;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
- * {@link Bound Window.Bound} primitive {@link PTransform}.
+ * {@link Window.Assign} primitive {@link PTransform}.
  */
 class WindowEvaluatorFactory implements TransformEvaluatorFactory {
   private final EvaluationContext evaluationContext;
@@ -53,7 +52,8 @@ class WindowEvaluatorFactory implements 
TransformEvaluatorFactory {
   }
 
   private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
-      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, 
Window.Bound<InputT>> transform) {
+      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, 
Window.Assign<InputT>>
+          transform) {
     WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
     UncommittedBundle<InputT> outputBundle =
         evaluationContext.createBundle(
@@ -68,14 +68,15 @@ class WindowEvaluatorFactory implements 
TransformEvaluatorFactory {
   public void cleanup() {}
 
   private static class WindowIntoEvaluator<InputT> implements 
TransformEvaluator<InputT> {
-    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, 
Window.Bound<InputT>>
+    private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, 
Window.Assign<InputT>>
         transform;
     private final WindowFn<InputT, ?> windowFn;
     private final UncommittedBundle<InputT> outputBundle;
 
     @SuppressWarnings("unchecked")
     public WindowIntoEvaluator(
-        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, 
Window.Bound<InputT>> transform,
+        AppliedPTransform<PCollection<InputT>, PCollection<InputT>, 
Window.Assign<InputT>>
+            transform,
         WindowFn<? super InputT, ?> windowFn,
         UncommittedBundle<InputT> outputBundle) {
       this.outputBundle = outputBundle;

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index 7e6eb2f..ca52852 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -33,11 +33,7 @@ import 
org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -114,30 +110,6 @@ public class WindowEvaluatorFactoryTest {
   }
 
   @Test
-  public void nullWindowFunSucceeds() throws Exception {
-    Bound<Long> transform =
-        Window.<Long>triggering(
-                
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))
-            .accumulatingFiredPanes();
-    PCollection<Long> triggering = input.apply(transform);
-
-    CommittedBundle<Long> inputBundle = createInputBundle();
-
-    UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, 
inputBundle);
-
-    TransformResult<Long> result = runEvaluator(triggering, inputBundle, 
transform);
-
-    assertThat(
-        Iterables.getOnlyElement(result.getOutputBundles()),
-        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
-    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
-    assertThat(
-        committed.getElements(),
-        containsInAnyOrder(
-            valueInIntervalWindow, valueInGlobalWindow, 
valueInGlobalAndTwoIntervalWindows));
-  }
-
-  @Test
   public void singleWindowFnSucceeds() throws Exception {
     Duration windowDuration = Duration.standardDays(7);
     Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
@@ -150,7 +122,7 @@ public class WindowEvaluatorFactoryTest {
     BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, 
EPOCH.plus(windowDuration));
     BoundedWindow thirdWindow = new 
IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle, 
transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -185,7 +157,7 @@ public class WindowEvaluatorFactoryTest {
     CommittedBundle<Long> inputBundle = createInputBundle();
     UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, 
inputBundle);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle, 
transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -242,7 +214,7 @@ public class WindowEvaluatorFactoryTest {
     CommittedBundle<Long> inputBundle = createInputBundle();
     UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, 
inputBundle);
 
-    TransformResult<Long> result = runEvaluator(windowed, inputBundle, 
transform);
+    TransformResult<Long> result = runEvaluator(windowed, inputBundle);
 
     assertThat(
         Iterables.getOnlyElement(result.getOutputBundles()),
@@ -307,17 +279,9 @@ public class WindowEvaluatorFactoryTest {
   }
 
   private TransformResult<Long> runEvaluator(
-      PCollection<Long> windowed,
-      CommittedBundle<Long> inputBundle,
-      Window.Bound<Long> windowTransform /* Required while Window.Bound is a 
composite */)
-      throws Exception {
+      PCollection<Long> windowed, CommittedBundle<Long> inputBundle) throws 
Exception {
     TransformEvaluator<Long> evaluator =
-        factory.forApplication(
-            AppliedPTransform
-                .<PCollection<Long>, PCollection<Long>,
-                    PTransform<PCollection<Long>, PCollection<Long>>>
-                    of("Window", input.expand(), windowed.expand(), 
windowTransform, p),
-            inputBundle);
+        factory.forApplication(DirectGraphs.getProducer(windowed), 
inputBundle);
 
     evaluator.processElement(valueInGlobalWindow);
     evaluator.processElement(valueInGlobalAndTwoIntervalWindows);

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
index acc204d..f043c90 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
@@ -110,7 +110,7 @@ class FlinkBatchTransformTranslators {
 
     TRANSLATORS.put(Flatten.PCollections.class, new 
FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslatorBatch());
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch());
 
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundTranslatorBatch());
     TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiTranslatorBatch());
@@ -145,11 +145,11 @@ class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class WindowBoundTranslatorBatch<T>
-      implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Bound<T>> {
+  private static class WindowAssignTranslatorBatch<T>
+      implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Window.Assign<T>> {
 
     @Override
-    public void translateNode(Window.Bound<T> transform, 
FlinkBatchTranslationContext context) {
+    public void translateNode(Window.Assign<T> transform, 
FlinkBatchTranslationContext context) {
       PValue input = context.getInput(transform);
 
       TypeInformation<WindowedValue<T>> resultTypeInfo =

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 03f567d..c7df91d 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -124,7 +124,7 @@ class FlinkStreamingTransformTranslators {
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
     TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiStreamingTranslator());
 
-    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
+    TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator());
     TRANSLATORS.put(Flatten.PCollections.class, new 
FlattenPCollectionTranslator());
     TRANSLATORS.put(
         FlinkStreamingViewOverrides.CreateFlinkPCollectionView.class,
@@ -702,12 +702,12 @@ class FlinkStreamingTransformTranslators {
     }
   }
 
-  private static class WindowBoundTranslator<T>
-      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
+  private static class WindowAssignTranslator<T>
+      extends 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Assign<T>> {
 
     @Override
     public void translateNode(
-        Window.Bound<T> transform,
+        Window.Assign<T> transform,
         FlinkStreamingTranslationContext context) {
 
       @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
index f241ad0..c3a5095 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
@@ -26,7 +26,7 @@ import org.apache.flink.util.Collector;
 
 /**
  * Flink {@link FlatMapFunction} for implementing
- * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound}.
+ * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
  */
 public class FlinkAssignWindows<T, W extends BoundedWindow>
     implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index fe5db5a..7e559e9 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -85,7 +85,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.Bound;
 import org.apache.beam.sdk.util.AppliedCombineFn;
 import org.apache.beam.sdk.util.CloudObject;
 import org.apache.beam.sdk.util.PropertyNames;
@@ -878,14 +877,14 @@ public class DataflowPipelineTranslator {
         });
 
     registerTransformTranslator(
-        Window.Bound.class,
-        new TransformTranslator<Bound>() {
+        Window.Assign.class,
+        new TransformTranslator<Window.Assign>() {
           @Override
-          public void translate(Window.Bound transform, TranslationContext 
context) {
+          public void translate(Window.Assign transform, TranslationContext 
context) {
             translateHelper(transform, context);
           }
 
-          private <T> void translateHelper(Window.Bound<T> transform, 
TranslationContext context) {
+          private <T> void translateHelper(Window.Assign<T> transform, 
TranslationContext context) {
             StepTranslationContext stepContext = context.addStep(transform, 
"Bucket");
             PCollection<T> input = context.getInput(transform);
             stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7fc09ad..8ebb496 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -611,10 +611,10 @@ public final class TransformTranslator {
     rdd.saveAsNewAPIHadoopFile(outputDir, keyClass, valueClass, formatClass, 
conf);
   }
 
-  private static <T, W extends BoundedWindow> 
TransformEvaluator<Window.Bound<T>> window() {
-    return new TransformEvaluator<Window.Bound<T>>() {
+  private static <T, W extends BoundedWindow> 
TransformEvaluator<Window.Assign<T>> window() {
+    return new TransformEvaluator<Window.Assign<T>>() {
       @Override
-      public void evaluate(Window.Bound<T> transform, EvaluationContext 
context) {
+      public void evaluate(Window.Assign<T> transform, EvaluationContext 
context) {
         @SuppressWarnings("unchecked")
         JavaRDD<WindowedValue<T>> inRDD =
             ((BoundedDataset<T>) context.borrowDataset(transform)).getRDD();
@@ -734,7 +734,7 @@ public final class TransformTranslator {
     EVALUATORS.put(View.AsSingleton.class, viewAsSingleton());
     EVALUATORS.put(View.AsIterable.class, viewAsIter());
     EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
-    EVALUATORS.put(Window.Bound.class, window());
+    EVALUATORS.put(Window.Assign.class, window());
     // mostly test evaluators
     EVALUATORS.put(StorageLevelPTransform.class, storageLevel());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 6b27436..158593e 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -101,14 +101,14 @@ public final class TranslationUtils {
    * with triggering or allowed lateness).
    * </p>
    *
-   * @param transform The {@link Window.Bound} transformation.
+   * @param transform The {@link Window.Assign} transformation.
    * @param context   The {@link EvaluationContext}.
    * @param <T>       PCollection type.
    * @param <W>       {@link BoundedWindow} type.
    * @return if to apply the transformation.
    */
   public static <T, W extends BoundedWindow> boolean
-  skipAssignWindows(Window.Bound<T> transform, EvaluationContext context) {
+  skipAssignWindows(Window.Assign<T> transform, EvaluationContext context) {
     @SuppressWarnings("unchecked")
     WindowFn<? super T, W> windowFn = (WindowFn<? super T, W>) 
transform.getWindowFn();
     return windowFn == null

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index a856897..e3445bf 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -210,10 +210,10 @@ final class StreamingTransformTranslator {
     };
   }
 
-  private static <T, W extends BoundedWindow> 
TransformEvaluator<Window.Bound<T>> window() {
-    return new TransformEvaluator<Window.Bound<T>>() {
+  private static <T, W extends BoundedWindow> 
TransformEvaluator<Window.Assign<T>> window() {
+    return new TransformEvaluator<Window.Assign<T>>() {
       @Override
-      public void evaluate(final Window.Bound<T> transform, EvaluationContext 
context) {
+      public void evaluate(final Window.Assign<T> transform, EvaluationContext 
context) {
         @SuppressWarnings("unchecked")
         UnboundedDataset<T> unboundedDataset =
             ((UnboundedDataset<T>) context.borrowDataset(transform));
@@ -444,7 +444,7 @@ final class StreamingTransformTranslator {
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
     EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
     EVALUATORS.put(CreateStream.class, createFromQueue());
-    EVALUATORS.put(Window.Bound.class, window());
+    EVALUATORS.put(Window.Assign.class, window());
     EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 65dfaa9..94870ff 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -22,6 +22,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
 import org.joda.time.Duration;
 
 /**
@@ -152,7 +154,7 @@ public class Window {
      *
      * <p>This is the default behavior.
      */
-    FIRE_IF_NON_EMPTY;
+    FIRE_IF_NON_EMPTY
 
   }
 
@@ -469,8 +471,16 @@ public class Window {
     public PCollection<T> expand(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
           getOutputStrategyInternal(input.getWindowingStrategy());
-      return PCollection.createPrimitiveOutputInternal(
-          input.getPipeline(), outputStrategy, input.isBounded());
+      if (windowFn == null) {
+        // A new PCollection must be created in case input is reused in a 
different location as the
+        // two PCollections will, in general, have a different windowing 
strategy.
+        return PCollectionList.of(input)
+            .apply(Flatten.<T>pCollections())
+            .setWindowingStrategyInternal(outputStrategy);
+      } else {
+        // This is the AssignWindows primitive
+        return input.apply(new Assign<T>(outputStrategy));
+      }
     }
 
     @Override
@@ -522,6 +532,33 @@ public class Window {
     }
   }
 
+
+  /**
+   * A Primitive {@link PTransform} that assigns windows to elements based on 
a {@link WindowFn}.
+   */
+  public static class Assign<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private final WindowingStrategy<T, ?> updatedStrategy;
+
+    /**
+     * Create a new {@link Assign} where the output is windowed with the 
updated {@link
+     * WindowingStrategy}. Windows should be assigned using the {@link 
WindowFn} returned by
+     * {@link #getWindowFn()}.
+     */
+    private Assign(WindowingStrategy updatedStrategy) {
+      this.updatedStrategy = updatedStrategy;
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return PCollection.createPrimitiveOutputInternal(
+          input.getPipeline(), updatedStrategy, input.isBounded());
+    }
+
+    public WindowFn<T, ?> getWindowFn() {
+      return updatedStrategy.getWindowFn();
+    }
+  }
+
   /**
    * Creates a {@code Window} {@code PTransform} that does not change assigned
    * windows, but will cause windows to be merged again as part of the next

http://git-wip-us.apache.org/repos/asf/beam/blob/eaf9b9b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 55c7297..1101ebc 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.not;
@@ -30,15 +31,24 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
@@ -153,6 +163,51 @@ public class WindowTest implements Serializable {
     assertEquals(fixed25, strategy.getWindowFn());
   }
 
+  /**
+   * With {@link #testWindowIntoNullWindowFnNoAssign()}, demonstrates that the 
expansions of the
+   * {@link Window.Bound} transform depends on if it actually assigns elements 
to windows.
+   */
+  @Test
+  public void testWindowIntoWindowFnAssign() {
+    pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(
+            Window.<Integer>into(
+                
FixedWindows.of(Duration.standardMinutes(11L).plus(Duration.millis(1L)))));
+
+    final AtomicBoolean foundAssign = new AtomicBoolean(false);
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+            if (node.getTransform() instanceof Window.Assign) {
+              foundAssign.set(true);
+            }
+          }
+        });
+    assertThat(foundAssign.get(), is(true));
+  }
+
+  /**
+   * With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the 
expansions of the
+   * {@link Window.Bound} transform depends on if it actually assigns elements 
to windows.
+   */
+  @Test
+  public void testWindowIntoNullWindowFnNoAssign() {
+    pipeline
+        .apply(Create.of(1, 2, 3))
+        .apply(
+            Window.<Integer>triggering(AfterWatermark.pastEndOfWindow())
+                .withAllowedLateness(Duration.ZERO)
+                .accumulatingFiredPanes());
+
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+            assertThat(node.getTransform(), 
not(instanceOf(Window.Assign.class)));
+          }
+        });
+  }
+
   @Test
   public void testWindowGetName() {
     assertEquals("Window.Into()",
@@ -220,6 +275,87 @@ public class WindowTest implements Serializable {
       .apply("Trigger", Window.<String>triggering(trigger));
   }
 
+  private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, 
IntervalWindow> {
+    private static final IntervalWindow EVEN_WINDOW =
+        new IntervalWindow(
+            BoundedWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.INSTANCE.maxTimestamp());
+    private static final IntervalWindow ODD_WINDOW =
+        new IntervalWindow(
+            BoundedWindow.TIMESTAMP_MIN_VALUE, 
GlobalWindow.INSTANCE.maxTimestamp().minus(1));
+
+    @Override
+    public Collection<IntervalWindow> assignWindows(AssignContext c) throws 
Exception {
+      if (c.element() % 2 == 0) {
+        return Collections.singleton(EVEN_WINDOW);
+      }
+      return Collections.singleton(ODD_WINDOW);
+    }
+
+    @Override
+    public boolean isCompatible(WindowFn<?, ?> other) {
+      return other instanceof WindowOddEvenBuckets;
+    }
+
+    @Override
+    public Coder<IntervalWindow> windowCoder() {
+      return new IntervalWindow.IntervalWindowCoder();
+    }
+
+    @Override
+    public IntervalWindow getSideInputWindow(BoundedWindow window) {
+      throw new UnsupportedOperationException(
+          String.format("Can't use %s for side inputs", 
getClass().getSimpleName()));
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testNoWindowFnDoesNotReassignWindows() {
+    pipeline.enableAbandonedNodeEnforcement(true);
+
+    final PCollection<Long> initialWindows =
+        pipeline
+            .apply(CountingInput.upTo(10L))
+            .apply("AssignWindows", Window.into(new WindowOddEvenBuckets()));
+
+    // Sanity check the window assignment to demonstrate the baseline
+    PAssert.that(initialWindows)
+        .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
+        .containsInAnyOrder(0L, 2L, 4L, 6L, 8L);
+    PAssert.that(initialWindows)
+        .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
+        .containsInAnyOrder(1L, 3L, 5L, 7L, 9L);
+
+    PCollection<Boolean> upOne =
+        initialWindows.apply(
+            "ModifyTypes",
+            MapElements.<Long, Boolean>via(
+                new SimpleFunction<Long, Boolean>() {
+                  @Override
+                  public Boolean apply(Long input) {
+                    return input % 2 == 0;
+                  }
+                }));
+    PAssert.that(upOne)
+        .inWindow(WindowOddEvenBuckets.EVEN_WINDOW)
+        .containsInAnyOrder(true, true, true, true, true);
+    PAssert.that(upOne)
+        .inWindow(WindowOddEvenBuckets.ODD_WINDOW)
+        .containsInAnyOrder(false, false, false, false, false);
+
+    // The elements should be in the same windows, even though they would not 
be assigned to the
+    // same windows with the updated timestamps. If we try to apply the 
original WindowFn, the type
+    // will not be appropriate and the runner should crash, as a Boolean 
cannot be converted into
+    // a long.
+    PCollection<Boolean> updatedTrigger =
+        upOne.apply(
+            "UpdateWindowingStrategy",
+            Window.<Boolean>triggering(Never.ever())
+                .withAllowedLateness(Duration.ZERO)
+                .accumulatingFiredPanes());
+    pipeline.run();
+  }
+
   /**
    * Tests that when two elements are combined via a GroupByKey their output 
timestamp agrees
    * with the windowing function default, the end of the window.

Reply via email to