[ 
https://issues.apache.org/jira/browse/BEAM-4090?focusedWorklogId=102304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102304
 ]

ASF GitHub Bot logged work on BEAM-4090:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/May/18 22:10
            Start Date: 15/May/18 22:10
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5194: [BEAM-4090]  Introduce 
TimerProvider
URL: https://github.com/apache/beam/pull/5194
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
index 59d3043450a..fb0660e8e3a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleProcessor.java
@@ -24,7 +24,8 @@
  * An executor that is capable of processing some bundle of input over some 
executable stage or
  * step.
  */
-interface BundleProcessor<BundleT extends Bundle<?>, ExecutableT> {
+interface BundleProcessor<
+    CollectionT, BundleT extends Bundle<?, ? extends CollectionT>, 
ExecutableT> {
   /**
    * Execute the provided bundle using the provided Executable, calling back 
to the {@link
    * CompletionCallback} when execution completes.
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
index 07c3d22d37f..5522dac8627 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedBundle.java
@@ -19,10 +19,8 @@
 package org.apache.beam.runners.direct;
 
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.local.Bundle;
 import org.apache.beam.runners.local.StructuralKey;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -35,7 +33,7 @@
  * a part of at a later point.
  * @param <T> the type of elements contained within this bundle
  */
-interface CommittedBundle<T> extends Bundle<T> {
+interface CommittedBundle<T> extends Bundle<T, PCollection<T>> {
   /**
    * Returns the PCollection that the elements of this bundle belong to.
    */
@@ -43,7 +41,7 @@
   PCollection<T> getPCollection();
 
   /**
-   * Returns the key that was output in the most recent {@link GroupByKey} in 
the
+   * Returns the key that was output in the most recent {@code GroupByKey} in 
the
    * execution of this bundle.
    */
   StructuralKey<?> getKey();
@@ -54,11 +52,16 @@
    */
   Iterable<WindowedValue<T>> getElements();
 
-  @Override
+  /**
+   * Return the minimum timestamp among elements in this bundle.
+   *
+   * <p>This should be equivalent to iterating over all of the elements within 
a bundle and
+   * selecting the minimum timestamp from among them.
+   */
   Instant getMinimumTimestamp();
 
   /**
-   * Returns the processing time output watermark at the time the producing 
{@link PTransform}
+   * Returns the processing time output watermark at the time the producing 
{@code Executable}
    * committed this bundle. Downstream synchronized processing time watermarks 
cannot progress
    * past this point before consuming this bundle.
    *
@@ -67,7 +70,6 @@
    * timers that fired to produce this bundle.
    */
   Instant getSynchronizedProcessingOutputWatermark();
-
   /**
    * Return a new {@link CommittedBundle} that is like this one, except calls 
to
    * {@link #getElements()} will return the provided elements. This bundle is 
unchanged.
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index bfa65cd2d8e..4053b8473e5 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -36,7 +36,7 @@
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.TimerProvider.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.Pipeline;
@@ -77,7 +77,8 @@
   private final BundleFactory bundleFactory;
 
   /** The current processing time and event time watermarks and timers. */
-  private final WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> 
watermarkManager;
+  private final WatermarkManager<AppliedPTransform<?, ?, ?>, ? super 
PCollection<?>>
+      watermarkManager;
 
   /** Executes callbacks based on the progression of the watermark. */
   private final WatermarkCallbackExecutor callbackExecutor;
@@ -121,7 +122,7 @@ private EvaluationContext(
 
   public void initialize(
       Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>> 
initialInputs) {
-    watermarkManager.initialize(initialInputs);
+    watermarkManager.initialize((Map) initialInputs);
   }
 
   /**
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index ab759287637..cce4d7e1947 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -44,6 +44,7 @@
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -55,7 +56,8 @@
  * EvaluationContext} to execute a {@link Pipeline}.
  */
 final class ExecutorServiceParallelExecutor
-    implements PipelineExecutor, BundleProcessor<CommittedBundle<?>, 
AppliedPTransform<?, ?, ?>> {
+    implements PipelineExecutor,
+        BundleProcessor<PCollection<?>, CommittedBundle<?>, 
AppliedPTransform<?, ?, ?>> {
   private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
 
   private final int targetParallelism;
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
index e3632697164..3c37c4f278e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
@@ -32,7 +32,7 @@
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.TimerProvider.FiredTimers;
 import org.apache.beam.runners.local.ExecutionDriver;
 import org.apache.beam.runners.local.PipelineMessageReceiver;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -52,7 +52,8 @@
   public static ExecutionDriver create(
       EvaluationContext context,
       DirectGraph graph,
-      BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> 
bundleProcessor,
+      BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, 
?, ?>>
+          bundleProcessor,
       PipelineMessageReceiver messageReceiver,
       Map<AppliedPTransform<?, ?, ?>, 
ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) {
     return new QuiescenceDriver(context, graph, bundleProcessor, 
messageReceiver, initialBundles);
@@ -60,7 +61,8 @@ public static ExecutionDriver create(
 
   private final EvaluationContext evaluationContext;
   private final DirectGraph graph;
-  private final BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, 
?>> bundleProcessor;
+  private final BundleProcessor<PCollection<?>, CommittedBundle<?>, 
AppliedPTransform<?, ?, ?>>
+      bundleProcessor;
   private final PipelineMessageReceiver pipelineMessageReceiver;
 
   private final CompletionCallback defaultCompletionCallback =
@@ -78,7 +80,8 @@ public static ExecutionDriver create(
   private QuiescenceDriver(
       EvaluationContext evaluationContext,
       DirectGraph graph,
-      BundleProcessor<CommittedBundle<?>, AppliedPTransform<?, ?, ?>> 
bundleProcessor,
+      BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, 
?, ?>>
+          bundleProcessor,
       PipelineMessageReceiver pipelineMessageReceiver,
       Map<AppliedPTransform<?, ?, ?>, 
ConcurrentLinkedQueue<CommittedBundle<?>>>
           pendingRootBundles) {
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TimerProvider.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TimerProvider.java
new file mode 100644
index 00000000000..3d8e3f0ccba
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TimerProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import com.google.auto.value.AutoValue;
+import java.util.Collection;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.local.StructuralKey;
+
+/** A provider of {@link FiredTimers}. */
+interface TimerProvider<ExecutableT> {
+  /**
+   * Extracts and returns all of the timers which have fired but not yet been 
returned by a call to
+   * this method.
+   */
+  Collection<FiredTimers<ExecutableT>> extractFiredTimers();
+
+  /**
+   * A pair of {@link TimerData} and key which can be delivered to the 
appropriate {@code
+   * ExecutableT}. A timer fires at the executable that set it with a specific 
key when the time
+   * domain in which it lives progresses past a specified time, as determined 
by the {@link
+   * WatermarkManager}.
+   */
+  @AutoValue
+  abstract class FiredTimers<ExecutableT> {
+    static <ExecutableT> FiredTimers<ExecutableT> create(
+        ExecutableT executable, StructuralKey<?> key, Collection<TimerData> 
timers) {
+      return new AutoValue_TimerProvider_FiredTimers(executable, key, timers);
+    }
+
+    /** The executable the timers were set at and will be delivered to. */
+    public abstract ExecutableT getExecutable();
+
+    /** The key the timers were set for and will be delivered to. */
+    public abstract StructuralKey<?> getKey();
+
+    /**
+     * Gets all of the timers that have fired within the provided {@link 
TimeDomain}. If no timers
+     * fired within the provided domain, return an empty collection.
+     *
+     * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of 
increasing timestamp.
+     */
+    public abstract Collection<TimerData> getTimers();
+  }
+}
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 86e904655ce..c77ff9a389f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -54,6 +54,7 @@
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.local.Bundle;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -61,7 +62,6 @@
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
 import org.joda.time.Instant;
 
 /**
@@ -126,7 +126,7 @@
  * Watermark_PCollection = Watermark_Out_ProducingPTransform
  * </pre>
  */
-class WatermarkManager<ExecutableT, CollectionT> {
+class WatermarkManager<ExecutableT, CollectionT> implements 
TimerProvider<ExecutableT> {
   // The number of updates to apply in #tryApplyPendingUpdates
   private static final int MAX_INCREMENTAL_UPDATES = 10;
 
@@ -211,7 +211,7 @@ public static WatermarkUpdate fromTimestamps(Instant 
oldTime, Instant currentTim
    */
   @VisibleForTesting static class AppliedPTransformInputWatermark implements 
Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
-    private final SortedMultiset<CommittedBundle<?>> pendingElements;
+    private final SortedMultiset<Bundle<?, ?>> pendingElements;
 
     // This tracks only the quantity of timers at each timestamp, for quickly 
getting the cross-key
     // minimum
@@ -233,7 +233,7 @@ public AppliedPTransformInputWatermark(Collection<? extends 
Watermark> inputWate
       // be consumed without modifications.
       //
       // The same logic is applied for pending timers
-      Ordering<CommittedBundle<?>> pendingBundleComparator =
+      Ordering<Bundle<?, ?>> pendingBundleComparator =
           new 
BundleByElementTimestampComparator().compound(Ordering.arbitrary());
       this.pendingElements =
           TreeMultiset.create(pendingBundleComparator);
@@ -280,11 +280,11 @@ public synchronized WatermarkUpdate refresh() {
       return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
     }
 
-    private synchronized void addPending(CommittedBundle<?> newPending) {
+    private synchronized void addPending(Bundle<?, ?> newPending) {
       pendingElements.add(newPending);
     }
 
-    private synchronized void removePending(CommittedBundle<?> completed) {
+    private synchronized void removePending(Bundle<?, ?> completed) {
       pendingElements.remove(completed);
     }
 
@@ -446,7 +446,7 @@ public synchronized String toString() {
    */
   private static class SynchronizedProcessingTimeInputWatermark implements 
Watermark {
     private final Collection<? extends Watermark> inputWms;
-    private final Collection<CommittedBundle<?>> pendingBundles;
+    private final Collection<Bundle<?, ?>> pendingBundles;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> 
processingTimers;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> 
synchronizedProcessingTimers;
     private final Map<StructuralKey<?>, Table<StateNamespace, String, 
TimerData>> existingTimers;
@@ -494,7 +494,7 @@ public synchronized WatermarkUpdate refresh() {
       for (Watermark input : inputWms) {
         minTime = INSTANT_ORDERING.min(minTime, input.get());
       }
-      for (CommittedBundle<?> bundle : pendingBundles) {
+      for (Bundle<?, ?> bundle : pendingBundles) {
         // TODO: Track elements in the bundle by the processing time they were 
output instead of
         // entire bundles. Requried to support arbitrarily splitting and 
merging bundles between
         // steps
@@ -504,11 +504,11 @@ public synchronized WatermarkUpdate refresh() {
       return WatermarkUpdate.fromTimestamps(oldHold, minTime);
     }
 
-    public synchronized void addPending(CommittedBundle<?> bundle) {
+    public synchronized void addPending(Bundle<?, ?> bundle) {
       pendingBundles.add(bundle);
     }
 
-    public synchronized void removePending(CommittedBundle<?> bundle) {
+    public synchronized void removePending(Bundle<?, ?> bundle) {
       pendingBundles.remove(bundle);
     }
 
@@ -785,7 +785,7 @@ public Instant get() {
    * @param clock the clock to use to determine processing time
    * @param graph the graph representing this pipeline
    */
-  public static WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> create(
+  public static WatermarkManager<AppliedPTransform<?, ?, ?>, ? super 
PCollection<?>> create(
       Clock clock, DirectGraph graph) {
     return new WatermarkManager<>(clock, graph);
   }
@@ -879,13 +879,14 @@ public TransformWatermarks getWatermarks(ExecutableT 
executable) {
     return transformToWatermarks.get(executable);
   }
 
-  public void initialize(Map<ExecutableT, ? extends 
Iterable<CommittedBundle<?>>> initialBundles) {
+  public void initialize(
+      Map<ExecutableT, ? extends Iterable<Bundle<?,  CollectionT>>> 
initialBundles) {
     refreshLock.lock();
     try {
-      for (Map.Entry<ExecutableT, ? extends Iterable<CommittedBundle<?>>> 
rootEntry :
+      for (Map.Entry<ExecutableT, ? extends Iterable<Bundle<?, CollectionT>>> 
rootEntry :
           initialBundles.entrySet()) {
         TransformWatermarks rootWms = 
transformToWatermarks.get(rootEntry.getKey());
-        for (CommittedBundle<?> initialBundle : rootEntry.getValue()) {
+        for (Bundle<?, ? extends CollectionT> initialBundle : 
rootEntry.getValue()) {
           rootWms.addPending(initialBundle);
         }
         pendingRefreshes.add(rootEntry.getKey());
@@ -920,11 +921,11 @@ public void initialize(Map<ExecutableT, ? extends 
Iterable<CommittedBundle<?>>>
    * @param earliestHold the earliest watermark hold in the executable's state.
    */
   public void updateWatermarks(
-      @Nullable CommittedBundle<?> completed,
+      @Nullable Bundle<?, ? extends CollectionT> completed,
       TimerUpdate timerUpdate,
       ExecutableT executable,
-      @Nullable CommittedBundle<?> unprocessedInputs,
-      Iterable<? extends CommittedBundle<?>> outputs,
+      @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
+      Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
       Instant earliestHold) {
     pendingUpdates.offer(
         PendingWatermarkUpdate.create(
@@ -961,16 +962,16 @@ private void applyAllPendingUpdates() {
   @GuardedBy("refreshLock")
   private void applyNUpdates(int numUpdates) {
     for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates 
<= 0); i++) {
-      PendingWatermarkUpdate<ExecutableT> pending = pendingUpdates.poll();
+      PendingWatermarkUpdate<ExecutableT, CollectionT> pending = 
pendingUpdates.poll();
       applyPendingUpdate(pending);
       pendingRefreshes.add(pending.getExecutable());
     }
   }
 
   /** Apply a {@link PendingWatermarkUpdate} to the {@link WatermarkManager}. 
*/
-  private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT> pending) 
{
+  private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT, 
CollectionT> pending) {
     ExecutableT executable = pending.getExecutable();
-    CommittedBundle<?> inputBundle = pending.getInputBundle();
+    Bundle<?, ? extends CollectionT> inputBundle = pending.getInputBundle();
 
     updatePending(
         inputBundle,
@@ -996,19 +997,19 @@ private void 
applyPendingUpdate(PendingWatermarkUpdate<ExecutableT> pending) {
    * watermark but the element it produced is not yet pending. This can cause 
the watermark to
    * erroneously advance.
    *
-   * <p>See {@link #updateWatermarks(CommittedBundle, TimerUpdate, Object, 
CommittedBundle,
+   * <p>See {@link #updateWatermarks(Bundle, TimerUpdate, Object, Bundle,
    * Iterable, Instant)} for information about the parameters of this method.
    */
   private void updatePending(
-      CommittedBundle<?> input,
+      Bundle<?, ? extends CollectionT> input,
       TimerUpdate timerUpdate,
       ExecutableT executable,
-      @Nullable CommittedBundle<?> unprocessedInputs,
-      Iterable<? extends CommittedBundle<?>> outputs) {
+      @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
+      Iterable<? extends Bundle<?, ? extends CollectionT>> outputs) {
     // Newly pending elements must be added before completed elements are 
removed, as the two
     // do not share a Mutex within this call and thus can be interleaved with 
external calls to
     // refresh.
-    for (CommittedBundle<?> bundle : outputs) {
+    for (Bundle<?, ? extends CollectionT> bundle : outputs) {
       for (ExecutableT consumer :
           // TODO: Remove this cast once CommittedBundle returns a CollectionT
           graph.getPerElementConsumers((CollectionT) bundle.getPCollection())) 
{
@@ -1271,12 +1272,12 @@ private void setEventTimeHold(Object key, Instant 
newHold) {
       outputWatermark.updateHold(key, newHold);
     }
 
-    private void removePending(CommittedBundle<?> bundle) {
+    private void removePending(Bundle<?, ?> bundle) {
       inputWatermark.removePending(bundle);
       synchronizedProcessingInputWatermark.removePending(bundle);
     }
 
-    private void addPending(CommittedBundle<?> bundle) {
+    private void addPending(Bundle<?, ?> bundle) {
       inputWatermark.addPending(bundle);
       synchronizedProcessingInputWatermark.addPending(bundle);
     }
@@ -1297,7 +1298,7 @@ private void addPending(CommittedBundle<?> bundle) {
       for (Map.Entry<StructuralKey<?>, List<TimerData>> firedTimers :
           timersPerKey.entrySet()) {
         keyFiredTimers.add(
-            new FiredTimers<>(executable, firedTimers.getKey(), 
firedTimers.getValue()));
+            FiredTimers.create(executable, firedTimers.getKey(), 
firedTimers.getValue()));
       }
       return keyFiredTimers;
     }
@@ -1481,54 +1482,10 @@ public boolean equals(Object other) {
     }
   }
 
-  /**
-   * A pair of {@link TimerData} and key which can be delivered to the 
appropriate
-   * {@link AppliedPTransform}. A timer fires at the executable that set it 
with a specific key when
-   * the time domain in which it lives progresses past a specified time, as 
determined by the
-   * {@link WatermarkManager}.
-   */
-  public static class FiredTimers<ExecutableT> {
-    /** The executable the timers were set at and will be delivered to. */
-    private final ExecutableT executable;
-    /** The key the timers were set for and will be delivered to. */
-    private final StructuralKey<?> key;
-    private final Collection<TimerData> timers;
-
-    private FiredTimers(
-        ExecutableT executable, StructuralKey<?> key, Collection<TimerData> 
timers) {
-      this.executable = executable;
-      this.key = key;
-      this.timers = timers;
-    }
-
-    public ExecutableT getExecutable() {
-      return executable;
-    }
-
-    public StructuralKey<?> getKey() {
-      return key;
-    }
-
-    /**
-     * Gets all of the timers that have fired within the provided {@link 
TimeDomain}. If no timers
-     * fired within the provided domain, return an empty collection.
-     *
-     * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of 
increasing timestamp.
-     */
-    public Collection<TimerData> getTimers() {
-      return timers;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(FiredTimers.class).add("timers", 
timers).toString();
-    }
-  }
-
-  private static class BundleByElementTimestampComparator extends 
Ordering<CommittedBundle<?>>
+  private static class BundleByElementTimestampComparator extends 
Ordering<Bundle<?, ?>>
       implements Serializable {
     @Override
-    public int compare(CommittedBundle<?> o1, CommittedBundle<?> o2) {
+    public int compare(Bundle<?, ?> o1, Bundle<?, ?> o2) {
       return ComparisonChain.start()
           .compare(o1.getMinimumTimestamp(), o2.getMinimumTimestamp())
           .result();
@@ -1536,28 +1493,29 @@ public int compare(CommittedBundle<?> o1, 
CommittedBundle<?> o2) {
   }
 
   @AutoValue
-  abstract static class PendingWatermarkUpdate<ExecutableT> {
+  abstract static class PendingWatermarkUpdate<ExecutableT, CollectionT> {
     abstract ExecutableT getExecutable();
 
     @Nullable
-    abstract CommittedBundle<?> getInputBundle();
+    abstract Bundle<?, ? extends CollectionT> getInputBundle();
 
     abstract TimerUpdate getTimerUpdate();
 
     @Nullable
-    abstract CommittedBundle<?> getUnprocessedInputs();
+    abstract Bundle<?, ? extends CollectionT> getUnprocessedInputs();
 
-    abstract Iterable<? extends CommittedBundle<?>> getOutputs();
+    abstract Iterable<? extends Bundle<?, ? extends CollectionT>> getOutputs();
 
     abstract Instant getEarliestHold();
 
-    public static <ExecutableT> PendingWatermarkUpdate<ExecutableT> create(
-        ExecutableT executable,
-        @Nullable CommittedBundle<?> inputBundle,
-        TimerUpdate timerUpdate,
-        @Nullable CommittedBundle<?> unprocessedInputs,
-        Iterable<? extends CommittedBundle<?>> outputs,
-        Instant earliestHold) {
+    public static <ExecutableT, CollectionT>
+        PendingWatermarkUpdate<ExecutableT, CollectionT> create(
+            ExecutableT executable,
+            @Nullable Bundle<?, ? extends CollectionT> inputBundle,
+            TimerUpdate timerUpdate,
+            @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
+            Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
+            Instant earliestHold) {
       return new AutoValue_WatermarkManager_PendingWatermarkUpdate<>(
           executable, inputBundle, timerUpdate, unprocessedInputs, outputs, 
earliestHold);
     }
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 800da7d57ab..477b549727c 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -37,7 +37,7 @@
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
-import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.TimerProvider.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 03764c1a1ba..07d3db98399 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -38,8 +38,8 @@
 import java.util.Map;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.direct.TimerProvider.FiredTimers;
 import 
org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark;
-import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import 
org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
@@ -65,7 +65,6 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -101,7 +100,7 @@
   private transient PCollection<Integer> intsToFlatten;
   private transient PCollection<Integer> flattened;
 
-  private transient WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> 
manager;
+  private transient WatermarkManager<AppliedPTransform<?, ?, ?>, ? super 
PCollection<?>> manager;
   private transient BundleFactory bundleFactory;
   private DirectGraph graph;
 
@@ -306,7 +305,7 @@ public void getWatermarkMultiIdenticalInput() {
 
     AppliedPTransform<?, ?, ?> theFlatten = graph.getProducer(multiConsumer);
 
-    WatermarkManager<AppliedPTransform<?, ?, ?>, PValue> tstMgr =
+    WatermarkManager<AppliedPTransform<?, ?, ?>, ? super PCollection<?>> 
tstMgr =
         WatermarkManager.create(clock, graph);
     CommittedBundle<Void> root =
         bundleFactory
@@ -323,7 +322,7 @@ public void getWatermarkMultiIdenticalInput() {
         ImmutableMap.<AppliedPTransform<?, ?, ?>, 
Collection<CommittedBundle<?>>>builder()
             .put(graph.getProducer(created), Collections.singleton(root))
             .build();
-    tstMgr.initialize(initialInputs);
+    tstMgr.initialize((Map) initialInputs);
     tstMgr.updateWatermarks(
         root,
         TimerUpdate.empty(),
diff --git 
a/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java 
b/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java
index 959494b2cff..cab9e18128d 100644
--- a/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java
+++ b/runners/local-java/src/main/java/org/apache/beam/runners/local/Bundle.java
@@ -18,11 +18,24 @@
 
 package org.apache.beam.runners.local;
 
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.joda.time.Instant;
 
 /** An immutable collection of elements which are part of a {@code 
PCollection}. */
-public interface Bundle<T> extends Iterable<WindowedValue<T>> {
+public interface Bundle<T, CollectionT> extends Iterable<WindowedValue<T>> {
+  /**
+   * Returns the PCollection that the elements of this bundle belong to.
+   */
+  @Nullable
+  CollectionT getPCollection();
+
+  /**
+   * Returns the key that was output in the most recent {@code GroupByKey} in 
the
+   * execution of this bundle.
+   */
+  StructuralKey<?> getKey();
+
   /**
    * Return the minimum timestamp among elements in this bundle.
    *
@@ -30,4 +43,16 @@
    * selecting the minimum timestamp from among them.
    */
   Instant getMinimumTimestamp();
+
+  /**
+   * Returns the processing time output watermark at the time the producing 
{@code Executable}
+   * committed this bundle. Downstream synchronized processing time watermarks 
cannot progress
+   * past this point before consuming this bundle.
+   *
+   * <p>This value is no greater than the earliest incomplete processing time 
or synchronized
+   * processing time {@link TimerData timer} at the time this bundle was 
committed, including any
+   * timers that fired to produce this bundle.
+   */
+  Instant getSynchronizedProcessingOutputWatermark();
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 102304)
    Time Spent: 40m  (was: 0.5h)

> Fork/Update Primitive Implementations for the ReferenceRunner
> -------------------------------------------------------------
>
>                 Key: BEAM-4090
>                 URL: https://issues.apache.org/jira/browse/BEAM-4090
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> The primitives that require implementation are:
> Impulse, Flatten, GroupByKey
> GroupByKey may be implemented by PartitionByKey/GroupByKeyAndWindow 
> The primitives that may be implemented as well are:
> AssignWindows, for known WindowFns



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to