Flatten FiredTimers and ExtractFiredTimers

Pass a single collection of fired timers, and have those objects contain
the associated transform and key that they fired for. Timers already
contain the domain they are in.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5dca2674
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5dca2674
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5dca2674

Branch: refs/heads/apex-runner
Commit: 5dca2674a8d145c6e619005c2282c6064cd7aab7
Parents: 6e1e57b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Nov 3 14:10:37 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Nov 4 13:05:21 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/EvaluationContext.java  |   6 +-
 .../direct/ExecutorServiceParallelExecutor.java |  41 ++++----
 .../beam/runners/direct/WatermarkManager.java   |  79 ++++++++------
 .../runners/direct/EvaluationContextTest.java   |  23 ++---
 .../runners/direct/WatermarkManagerTest.java    | 102 ++++++-------------
 5 files changed, 109 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 965e77d..b814def 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -391,11 +391,9 @@ class EvaluationContext {
    * <p>This is a destructive operation. Timers will only appear in the result 
of this method once
    * for each time they are set.
    */
-  public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
extractFiredTimers() {
+  public Collection<FiredTimers> extractFiredTimers() {
     forceRefresh();
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
-        watermarkManager.extractFiredTimers();
-    return fired;
+    return watermarkManager.extractFiredTimers();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index e32f671..d1ffea1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -440,29 +439,23 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
      */
     private void fireTimers() throws Exception {
       try {
-        for (Map.Entry<
-               AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
transformTimers :
-            evaluationContext.extractFiredTimers().entrySet()) {
-          AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
-          for (Map.Entry<StructuralKey<?>, FiredTimers> keyTimers :
-              transformTimers.getValue().entrySet()) {
-            for (TimeDomain domain : TimeDomain.values()) {
-              Collection<TimerData> delivery = 
keyTimers.getValue().getTimers(domain);
-              if (delivery.isEmpty()) {
-                continue;
-              }
-              KeyedWorkItem<?, Object> work =
-                  KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), 
delivery);
-              @SuppressWarnings({"unchecked", "rawtypes"})
-              CommittedBundle<?> bundle =
-                  evaluationContext
-                      .createKeyedBundle(keyTimers.getKey(), (PCollection) 
transform.getInput())
-                      .add(WindowedValue.valueInGlobalWindow(work))
-                      .commit(evaluationContext.now());
-              scheduleConsumption(transform, bundle, new 
TimerIterableCompletionCallback(delivery));
-              state.set(ExecutorState.ACTIVE);
-            }
-          }
+        for (FiredTimers transformTimers : 
evaluationContext.extractFiredTimers()) {
+          Collection<TimerData> delivery = transformTimers.getTimers();
+          KeyedWorkItem<?, Object> work =
+              KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), 
delivery);
+          @SuppressWarnings({"unchecked", "rawtypes"})
+          CommittedBundle<?> bundle =
+              evaluationContext
+                  .createKeyedBundle(
+                      transformTimers.getKey(),
+                      (PCollection) transformTimers.getTransform().getInput())
+                  .add(WindowedValue.valueInGlobalWindow(work))
+                  .commit(evaluationContext.now());
+          scheduleConsumption(
+              transformTimers.getTransform(),
+              bundle,
+              new TimerIterableCompletionCallback(delivery));
+          state.set(ExecutorState.ACTIVE);
         }
       } catch (Exception e) {
         LOG.error("Internal Error while delivering timers", e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 31b8091..f01c13c 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -740,14 +740,17 @@ public class WatermarkManager {
 
       wms =
           new TransformWatermarks(
-              inputWatermark, outputWatermark, inputProcessingWatermark, 
outputProcessingWatermark);
+              transform,
+              inputWatermark,
+              outputWatermark,
+              inputProcessingWatermark,
+              outputProcessingWatermark);
       transformToWatermarks.put(transform, wms);
     }
     return wms;
   }
 
-  private Collection<Watermark> getInputProcessingWatermarks(
-      AppliedPTransform<?, ?, ?> transform) {
+  private Collection<Watermark> 
getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) {
     ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
     Collection<? extends PValue> inputs = transform.getInput().expand();
     if (inputs.isEmpty()) {
@@ -924,15 +927,12 @@ public class WatermarkManager {
    * Returns a map of each {@link PTransform} that has pending timers to those 
timers. All of the
    * pending timers will be removed from this {@link WatermarkManager}.
    */
-  public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
extractFiredTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
allTimers = new HashMap<>();
+  public Collection<FiredTimers> extractFiredTimers() {
+    Collection<FiredTimers> allTimers = new ArrayList<>();
     for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> 
watermarksEntry :
         transformToWatermarks.entrySet()) {
-      Map<StructuralKey<?>, FiredTimers> keyFiredTimers =
-          watermarksEntry.getValue().extractFiredTimers();
-      if (!keyFiredTimers.isEmpty()) {
-        allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
-      }
+      Collection<FiredTimers> firedTimers = 
watermarksEntry.getValue().extractFiredTimers();
+      allTimers.addAll(firedTimers);
     }
     return allTimers;
   }
@@ -1043,6 +1043,8 @@ public class WatermarkManager {
    * A reference to the input and output watermarks of an {@link 
AppliedPTransform}.
    */
   public class TransformWatermarks {
+    private final AppliedPTransform<?, ?, ?> transform;
+
     private final AppliedPTransformInputWatermark inputWatermark;
     private final AppliedPTransformOutputWatermark outputWatermark;
 
@@ -1053,10 +1055,12 @@ public class WatermarkManager {
     private Instant latestSynchronizedOutputWm;
 
     private TransformWatermarks(
+        AppliedPTransform<?, ?, ?> transform,
         AppliedPTransformInputWatermark inputWatermark,
         AppliedPTransformOutputWatermark outputWatermark,
         SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
         SynchronizedProcessingTimeOutputWatermark 
outputSynchProcessingWatermark) {
+      this.transform = transform;
       this.inputWatermark = inputWatermark;
       this.outputWatermark = outputWatermark;
 
@@ -1128,7 +1132,7 @@ public class WatermarkManager {
       synchronizedProcessingInputWatermark.addPending(bundle);
     }
 
-    private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
+    private Collection<FiredTimers> extractFiredTimers() {
       Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
           inputWatermark.extractFiredEventTimeTimers();
       Map<StructuralKey<?>, List<TimerData>> processingTimers;
@@ -1137,31 +1141,33 @@ public class WatermarkManager {
           TimeDomain.PROCESSING_TIME, clock.now());
       synchronizedTimers = 
synchronizedProcessingInputWatermark.extractFiredDomainTimers(
           TimeDomain.SYNCHRONIZED_PROCESSING_TIME, 
getSynchronizedProcessingInputTime());
-      Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = 
new HashMap<>();
-      groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, 
synchronizedTimers);
 
-      Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>();
-      for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> 
firedTimers :
-          groupedTimers.entrySet()) {
-        keyFiredTimers.put(firedTimers.getKey(), new 
FiredTimers(firedTimers.getValue()));
+      Map<StructuralKey<?>, List<TimerData>> timersPerKey =
+          groupFiredTimers(eventTimeTimers, processingTimers, 
synchronizedTimers);
+      Collection<FiredTimers> keyFiredTimers = new 
ArrayList<>(timersPerKey.size());
+      for (Map.Entry<StructuralKey<?>, List<TimerData>> firedTimers :
+          timersPerKey.entrySet()) {
+        keyFiredTimers.add(
+            new FiredTimers(transform, firedTimers.getKey(), 
firedTimers.getValue()));
       }
       return keyFiredTimers;
     }
 
     @SafeVarargs
-    private final void groupFiredTimers(
-        Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> 
groupedToMutate,
+    private final Map<StructuralKey<?>, List<TimerData>> groupFiredTimers(
         Map<StructuralKey<?>, List<TimerData>>... timersToGroup) {
+      Map<StructuralKey<?>, List<TimerData>> groupedTimers = new HashMap<>();
       for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) {
         for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : 
subGroup.entrySet()) {
-          Map<TimeDomain, List<TimerData>> grouped = 
groupedToMutate.get(newTimers.getKey());
+          List<TimerData> grouped = groupedTimers.get(newTimers.getKey());
           if (grouped == null) {
-            grouped = new HashMap<>();
-            groupedToMutate.put(newTimers.getKey(), grouped);
+            grouped = new ArrayList<>();
+            groupedTimers.put(newTimers.getKey(), grouped);
           }
-          grouped.put(newTimers.getValue().get(0).getDomain(), 
newTimers.getValue());
+          grouped.addAll(newTimers.getValue());
         }
       }
+      return groupedTimers;
     }
 
     private void updateTimers(TimerUpdate update) {
@@ -1334,24 +1340,35 @@ public class WatermarkManager {
    * {@link WatermarkManager}.
    */
   public static class FiredTimers {
-    private final Map<TimeDomain, ? extends Collection<TimerData>> timers;
+    /** The transform the timers were set at and will be delivered to. */
+    private final AppliedPTransform<?, ?, ?> transform;
+    /** The key the timers were set for and will be delivered to. */
+    private final StructuralKey<?> key;
+    private final Collection<TimerData> timers;
 
-    private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> 
timers) {
+    private FiredTimers(
+        AppliedPTransform<?, ?, ?> transform, StructuralKey<?> key, 
Collection<TimerData> timers) {
+      this.transform = transform;
+      this.key = key;
       this.timers = timers;
     }
 
+    public AppliedPTransform<?, ?, ?> getTransform() {
+      return transform;
+    }
+
+    public StructuralKey<?> getKey() {
+      return key;
+    }
+
     /**
      * Gets all of the timers that have fired within the provided {@link 
TimeDomain}. If no timers
      * fired within the provided domain, return an empty collection.
      *
      * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of 
increasing timestamp.
      */
-    public Collection<TimerData> getTimers(TimeDomain domain) {
-      Collection<TimerData> domainTimers = timers.get(domain);
-      if (domainTimers == null) {
-        return Collections.emptyList();
-      }
-      return domainTimers;
+    public Collection<TimerData> getTimers() {
+      return timers;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index bc53570..e1277ac 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -373,36 +373,31 @@ public class EvaluationContextTest {
             .build();
 
     // haven't added any timers, must be empty
-    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+    assertThat(context.extractFiredTimers(), emptyIterable());
     context.handleResult(
         context.createKeyedBundle(key, created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         timerResult);
 
     // timer hasn't fired
-    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+    assertThat(context.extractFiredTimers(), emptyIterable());
 
     TransformResult advanceResult =
         
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
     // Should cause the downstream timer to fire
     context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
 
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
-        context.extractFiredTimers();
+    Collection<FiredTimers> fired = context.extractFiredTimers();
     assertThat(
-        fired,
-        Matchers.<AppliedPTransform<?, ?, 
?>>hasKey(downstream.getProducingTransformInternal()));
-    Map<StructuralKey<?>, FiredTimers> downstreamFired =
-        fired.get(downstream.getProducingTransformInternal());
-    assertThat(downstreamFired, Matchers.<Object>hasKey(key));
+        Iterables.getOnlyElement(fired).getKey(),
+        Matchers.<StructuralKey<?>>equalTo(key));
 
-    FiredTimers firedForKey = downstreamFired.get(key);
-    assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), 
emptyIterable());
-    assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), 
emptyIterable());
-    assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire));
+    FiredTimers firedForKey = Iterables.getOnlyElement(fired);
+    // Contains exclusively the fired timer
+    assertThat(firedForKey.getTimers(), contains(toFire));
 
     // Don't reextract timers
-    assertThat(context.extractFiredTimers().entrySet(), emptyIterable());
+    assertThat(context.extractFiredTimers(), emptyIterable());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index 1954005..6bde462 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
@@ -68,6 +67,7 @@ import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 import org.junit.Before;
@@ -915,12 +915,9 @@ public class WatermarkManagerTest implements Serializable {
         filteredDoubledWms.getSynchronizedProcessingOutputTime(),
         not(earlierThan(initialFilteredDoubledWm)));
 
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
firedTimers =
-        manager.extractFiredTimers();
+    Collection<FiredTimers> firedTimers = manager.extractFiredTimers();
     assertThat(
-        firedTimers.get(filtered.getProducingTransformInternal())
-            .get(key)
-            .getTimers(TimeDomain.PROCESSING_TIME),
+        Iterables.getOnlyElement(firedTimers).getTimers(),
         contains(pastTimer));
     // Our timer has fired, but has not been completed, so it holds our 
synchronized processing WM
     assertThat(filteredWms.getSynchronizedProcessingOutputTime(), 
not(laterThan(startTime)));
@@ -1099,10 +1096,9 @@ public class WatermarkManagerTest implements 
Serializable {
 
   @Test
   public void extractFiredTimersReturnsFiredEventTimeTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
initialTimers =
-        manager.extractFiredTimers();
+    Collection<FiredTimers> initialTimers = manager.extractFiredTimers();
     // Watermarks haven't advanced
-    assertThat(initialTimers.entrySet(), emptyIterable());
+    assertThat(initialTimers, emptyIterable());
 
     // Advance WM of keyed past the first timer, but ahead of the second and 
third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
@@ -1136,15 +1132,11 @@ public class WatermarkManagerTest implements 
Serializable {
         new Instant(1000L));
     manager.refreshAll();
 
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
firstTransformFiredTimers =
+    Collection<FiredTimers> firstFiredTimers =
         manager.extractFiredTimers();
-    assertThat(
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(firstFilteredTimers.get(key), not(nullValue()));
-    FiredTimers firstFired = firstFilteredTimers.get(key);
-    assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), 
contains(earliestTimer));
+    assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+    FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers);
+    assertThat(firstFired.getTimers(), contains(earliestTimer));
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
@@ -1153,24 +1145,18 @@ public class WatermarkManagerTest implements 
Serializable {
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     manager.refreshAll();
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
secondTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(secondFilteredTimers.get(key), not(nullValue()));
-    FiredTimers secondFired = secondFilteredTimers.get(key);
+    Collection<FiredTimers> secondFiredTimers = manager.extractFiredTimers();
+    assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+    FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers);
     // Contains, in order, middleTimer and then lastTimer
-    assertThat(secondFired.getTimers(TimeDomain.EVENT_TIME), 
contains(middleTimer, lastTimer));
+    assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
   }
 
   @Test
   public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
initialTimers =
-        manager.extractFiredTimers();
+    Collection<FiredTimers> initialTimers = manager.extractFiredTimers();
     // Watermarks haven't advanced
-    assertThat(initialTimers.entrySet(), emptyIterable());
+    assertThat(initialTimers, emptyIterable());
 
     // Advance WM of keyed past the first timer, but ahead of the second and 
third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
@@ -1204,15 +1190,10 @@ public class WatermarkManagerTest implements 
Serializable {
         new Instant(1000L));
     manager.refreshAll();
 
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
firstTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(firstFilteredTimers.get(key), not(nullValue()));
-    FiredTimers firstFired = firstFilteredTimers.get(key);
-    assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), 
contains(earliestTimer));
+    Collection<FiredTimers> firstFiredTimers = manager.extractFiredTimers();
+    assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+    FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers);
+    assertThat(firstFired.getTimers(), contains(earliestTimer));
 
     clock.set(new Instant(50_000L));
     manager.updateWatermarks(null,
@@ -1222,24 +1203,19 @@ public class WatermarkManagerTest implements 
Serializable {
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     manager.refreshAll();
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
secondTransformFiredTimers =
+    Collection<FiredTimers> secondFiredTimers =
         manager.extractFiredTimers();
-    assertThat(
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(secondFilteredTimers.get(key), not(nullValue()));
-    FiredTimers secondFired = secondFilteredTimers.get(key);
+    assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+    FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers);
     // Contains, in order, middleTimer and then lastTimer
-    assertThat(secondFired.getTimers(TimeDomain.PROCESSING_TIME), 
contains(middleTimer, lastTimer));
+    assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
   }
 
   @Test
   public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() 
{
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
initialTimers =
-        manager.extractFiredTimers();
+    Collection<FiredTimers> initialTimers = manager.extractFiredTimers();
     // Watermarks haven't advanced
-    assertThat(initialTimers.entrySet(), emptyIterable());
+    assertThat(initialTimers, emptyIterable());
 
     // Advance WM of keyed past the first timer, but ahead of the second and 
third
     CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered);
@@ -1273,16 +1249,11 @@ public class WatermarkManagerTest implements 
Serializable {
         new Instant(1000L));
     manager.refreshAll();
 
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
firstTransformFiredTimers =
+    Collection<FiredTimers> firstFiredTimers =
         manager.extractFiredTimers();
-    assertThat(
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
-        
firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(firstFilteredTimers.get(key), not(nullValue()));
-    FiredTimers firstFired = firstFilteredTimers.get(key);
-    assertThat(
-        firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), 
contains(earliestTimer));
+    assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+    FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers);
+    assertThat(firstFired.getTimers(), contains(earliestTimer));
 
     clock.set(new Instant(50_000L));
     manager.updateWatermarks(null,
@@ -1292,18 +1263,11 @@ public class WatermarkManagerTest implements 
Serializable {
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     manager.refreshAll();
-    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> 
secondTransformFiredTimers =
-        manager.extractFiredTimers();
-    assertThat(
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), 
not(nullValue()));
-    Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
-        
secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
-    assertThat(secondFilteredTimers.get(key), not(nullValue()));
-    FiredTimers secondFired = secondFilteredTimers.get(key);
+    Collection<FiredTimers> secondFiredTimers = manager.extractFiredTimers();
+    assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable()));
+    FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers);
     // Contains, in order, middleTimer and then lastTimer
-    assertThat(
-        secondFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME),
-        contains(middleTimer, lastTimer));
+    assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer));
   }
 
   @Test

Reply via email to