Flatten FiredTimers and ExtractFiredTimers Pass a single collection of fired timers, and have those objects contain the associated transform and key that they fired for. Timers already contain the domain they are in.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5dca2674 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5dca2674 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5dca2674 Branch: refs/heads/apex-runner Commit: 5dca2674a8d145c6e619005c2282c6064cd7aab7 Parents: 6e1e57b Author: Thomas Groh <tg...@google.com> Authored: Thu Nov 3 14:10:37 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Nov 4 13:05:21 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/direct/EvaluationContext.java | 6 +- .../direct/ExecutorServiceParallelExecutor.java | 41 ++++---- .../beam/runners/direct/WatermarkManager.java | 79 ++++++++------ .../runners/direct/EvaluationContextTest.java | 23 ++--- .../runners/direct/WatermarkManagerTest.java | 102 ++++++------------- 5 files changed, 109 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java index 965e77d..b814def 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java @@ -391,11 +391,9 @@ class EvaluationContext { * <p>This is a destructive operation. Timers will only appear in the result of this method once * for each time they are set. */ - public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() { + public Collection<FiredTimers> extractFiredTimers() { forceRefresh(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired = - watermarkManager.extractFiredTimers(); - return fired; + return watermarkManager.extractFiredTimers(); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index e32f671..d1ffea1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -47,7 +47,6 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -440,29 +439,23 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { */ private void fireTimers() throws Exception { try { - for (Map.Entry< - AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> transformTimers : - evaluationContext.extractFiredTimers().entrySet()) { - AppliedPTransform<?, ?, ?> transform = transformTimers.getKey(); - for (Map.Entry<StructuralKey<?>, FiredTimers> keyTimers : - transformTimers.getValue().entrySet()) { - for (TimeDomain domain : TimeDomain.values()) { - Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain); - if (delivery.isEmpty()) { - continue; - } - KeyedWorkItem<?, Object> work = - KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery); - @SuppressWarnings({"unchecked", "rawtypes"}) - CommittedBundle<?> bundle = - evaluationContext - .createKeyedBundle(keyTimers.getKey(), (PCollection) transform.getInput()) - .add(WindowedValue.valueInGlobalWindow(work)) - .commit(evaluationContext.now()); - scheduleConsumption(transform, bundle, new TimerIterableCompletionCallback(delivery)); - state.set(ExecutorState.ACTIVE); - } - } + for (FiredTimers transformTimers : evaluationContext.extractFiredTimers()) { + Collection<TimerData> delivery = transformTimers.getTimers(); + KeyedWorkItem<?, Object> work = + KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery); + @SuppressWarnings({"unchecked", "rawtypes"}) + CommittedBundle<?> bundle = + evaluationContext + .createKeyedBundle( + transformTimers.getKey(), + (PCollection) transformTimers.getTransform().getInput()) + .add(WindowedValue.valueInGlobalWindow(work)) + .commit(evaluationContext.now()); + scheduleConsumption( + transformTimers.getTransform(), + bundle, + new TimerIterableCompletionCallback(delivery)); + state.set(ExecutorState.ACTIVE); } } catch (Exception e) { LOG.error("Internal Error while delivering timers", e); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 31b8091..f01c13c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -740,14 +740,17 @@ public class WatermarkManager { wms = new TransformWatermarks( - inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark); + transform, + inputWatermark, + outputWatermark, + inputProcessingWatermark, + outputProcessingWatermark); transformToWatermarks.put(transform, wms); } return wms; } - private Collection<Watermark> getInputProcessingWatermarks( - AppliedPTransform<?, ?, ?> transform) { + private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) { ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder(); Collection<? extends PValue> inputs = transform.getInput().expand(); if (inputs.isEmpty()) { @@ -924,15 +927,12 @@ public class WatermarkManager { * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the * pending timers will be removed from this {@link WatermarkManager}. */ - public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>(); + public Collection<FiredTimers> extractFiredTimers() { + Collection<FiredTimers> allTimers = new ArrayList<>(); for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry : transformToWatermarks.entrySet()) { - Map<StructuralKey<?>, FiredTimers> keyFiredTimers = - watermarksEntry.getValue().extractFiredTimers(); - if (!keyFiredTimers.isEmpty()) { - allTimers.put(watermarksEntry.getKey(), keyFiredTimers); - } + Collection<FiredTimers> firedTimers = watermarksEntry.getValue().extractFiredTimers(); + allTimers.addAll(firedTimers); } return allTimers; } @@ -1043,6 +1043,8 @@ public class WatermarkManager { * A reference to the input and output watermarks of an {@link AppliedPTransform}. */ public class TransformWatermarks { + private final AppliedPTransform<?, ?, ?> transform; + private final AppliedPTransformInputWatermark inputWatermark; private final AppliedPTransformOutputWatermark outputWatermark; @@ -1053,10 +1055,12 @@ public class WatermarkManager { private Instant latestSynchronizedOutputWm; private TransformWatermarks( + AppliedPTransform<?, ?, ?> transform, AppliedPTransformInputWatermark inputWatermark, AppliedPTransformOutputWatermark outputWatermark, SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark, SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) { + this.transform = transform; this.inputWatermark = inputWatermark; this.outputWatermark = outputWatermark; @@ -1128,7 +1132,7 @@ public class WatermarkManager { synchronizedProcessingInputWatermark.addPending(bundle); } - private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() { + private Collection<FiredTimers> extractFiredTimers() { Map<StructuralKey<?>, List<TimerData>> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers(); Map<StructuralKey<?>, List<TimerData>> processingTimers; @@ -1137,31 +1141,33 @@ public class WatermarkManager { TimeDomain.PROCESSING_TIME, clock.now()); synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers( TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime()); - Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>(); - groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers); - Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>(); - for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers : - groupedTimers.entrySet()) { - keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue())); + Map<StructuralKey<?>, List<TimerData>> timersPerKey = + groupFiredTimers(eventTimeTimers, processingTimers, synchronizedTimers); + Collection<FiredTimers> keyFiredTimers = new ArrayList<>(timersPerKey.size()); + for (Map.Entry<StructuralKey<?>, List<TimerData>> firedTimers : + timersPerKey.entrySet()) { + keyFiredTimers.add( + new FiredTimers(transform, firedTimers.getKey(), firedTimers.getValue())); } return keyFiredTimers; } @SafeVarargs - private final void groupFiredTimers( - Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate, + private final Map<StructuralKey<?>, List<TimerData>> groupFiredTimers( Map<StructuralKey<?>, List<TimerData>>... timersToGroup) { + Map<StructuralKey<?>, List<TimerData>> groupedTimers = new HashMap<>(); for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) { for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) { - Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey()); + List<TimerData> grouped = groupedTimers.get(newTimers.getKey()); if (grouped == null) { - grouped = new HashMap<>(); - groupedToMutate.put(newTimers.getKey(), grouped); + grouped = new ArrayList<>(); + groupedTimers.put(newTimers.getKey(), grouped); } - grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue()); + grouped.addAll(newTimers.getValue()); } } + return groupedTimers; } private void updateTimers(TimerUpdate update) { @@ -1334,24 +1340,35 @@ public class WatermarkManager { * {@link WatermarkManager}. */ public static class FiredTimers { - private final Map<TimeDomain, ? extends Collection<TimerData>> timers; + /** The transform the timers were set at and will be delivered to. */ + private final AppliedPTransform<?, ?, ?> transform; + /** The key the timers were set for and will be delivered to. */ + private final StructuralKey<?> key; + private final Collection<TimerData> timers; - private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) { + private FiredTimers( + AppliedPTransform<?, ?, ?> transform, StructuralKey<?> key, Collection<TimerData> timers) { + this.transform = transform; + this.key = key; this.timers = timers; } + public AppliedPTransform<?, ?, ?> getTransform() { + return transform; + } + + public StructuralKey<?> getKey() { + return key; + } + /** * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers * fired within the provided domain, return an empty collection. * * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp. */ - public Collection<TimerData> getTimers(TimeDomain domain) { - Collection<TimerData> domainTimers = timers.get(domain); - if (domainTimers == null) { - return Collections.emptyList(); - } - return domainTimers; + public Collection<TimerData> getTimers() { + return timers; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index bc53570..e1277ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -373,36 +373,31 @@ public class EvaluationContextTest { .build(); // haven't added any timers, must be empty - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + assertThat(context.extractFiredTimers(), emptyIterable()); context.handleResult( context.createKeyedBundle(key, created).commit(Instant.now()), ImmutableList.<TimerData>of(), timerResult); // timer hasn't fired - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + assertThat(context.extractFiredTimers(), emptyIterable()); TransformResult advanceResult = StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); // Should cause the downstream timer to fire context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired = - context.extractFiredTimers(); + Collection<FiredTimers> fired = context.extractFiredTimers(); assertThat( - fired, - Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal())); - Map<StructuralKey<?>, FiredTimers> downstreamFired = - fired.get(downstream.getProducingTransformInternal()); - assertThat(downstreamFired, Matchers.<Object>hasKey(key)); + Iterables.getOnlyElement(fired).getKey(), + Matchers.<StructuralKey<?>>equalTo(key)); - FiredTimers firedForKey = downstreamFired.get(key); - assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); + FiredTimers firedForKey = Iterables.getOnlyElement(fired); + // Contains exclusively the fired timer + assertThat(firedForKey.getTimers(), contains(toFire)); // Don't reextract timers - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); + assertThat(context.extractFiredTimers(), emptyIterable()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5dca2674/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java index 1954005..6bde462 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; @@ -68,6 +67,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.Matcher; +import org.hamcrest.Matchers; import org.joda.time.Instant; import org.joda.time.ReadableInstant; import org.junit.Before; @@ -915,12 +915,9 @@ public class WatermarkManagerTest implements Serializable { filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(earlierThan(initialFilteredDoubledWm))); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firedTimers = - manager.extractFiredTimers(); + Collection<FiredTimers> firedTimers = manager.extractFiredTimers(); assertThat( - firedTimers.get(filtered.getProducingTransformInternal()) - .get(key) - .getTimers(TimeDomain.PROCESSING_TIME), + Iterables.getOnlyElement(firedTimers).getTimers(), contains(pastTimer)); // Our timer has fired, but has not been completed, so it holds our synchronized processing WM assertThat(filteredWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime))); @@ -1099,10 +1096,9 @@ public class WatermarkManagerTest implements Serializable { @Test public void extractFiredTimersReturnsFiredEventTimeTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers = - manager.extractFiredTimers(); + Collection<FiredTimers> initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced - assertThat(initialTimers.entrySet(), emptyIterable()); + assertThat(initialTimers, emptyIterable()); // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); @@ -1136,15 +1132,11 @@ public class WatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers = + Collection<FiredTimers> firstFiredTimers = manager.extractFiredTimers(); - assertThat( - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<StructuralKey<?>, FiredTimers> firstFilteredTimers = - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(firstFilteredTimers.get(key), not(nullValue())); - FiredTimers firstFired = firstFilteredTimers.get(key); - assertThat(firstFired.getTimers(TimeDomain.EVENT_TIME), contains(earliestTimer)); + assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable())); + FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers); + assertThat(firstFired.getTimers(), contains(earliestTimer)); manager.updateWatermarks(null, TimerUpdate.empty(), @@ -1153,24 +1145,18 @@ public class WatermarkManagerTest implements Serializable { Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers = - manager.extractFiredTimers(); - assertThat( - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<StructuralKey<?>, FiredTimers> secondFilteredTimers = - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(secondFilteredTimers.get(key), not(nullValue())); - FiredTimers secondFired = secondFilteredTimers.get(key); + Collection<FiredTimers> secondFiredTimers = manager.extractFiredTimers(); + assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable())); + FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers); // Contains, in order, middleTimer and then lastTimer - assertThat(secondFired.getTimers(TimeDomain.EVENT_TIME), contains(middleTimer, lastTimer)); + assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer)); } @Test public void extractFiredTimersReturnsFiredProcessingTimeTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers = - manager.extractFiredTimers(); + Collection<FiredTimers> initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced - assertThat(initialTimers.entrySet(), emptyIterable()); + assertThat(initialTimers, emptyIterable()); // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); @@ -1204,15 +1190,10 @@ public class WatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers = - manager.extractFiredTimers(); - assertThat( - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<StructuralKey<?>, FiredTimers> firstFilteredTimers = - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(firstFilteredTimers.get(key), not(nullValue())); - FiredTimers firstFired = firstFilteredTimers.get(key); - assertThat(firstFired.getTimers(TimeDomain.PROCESSING_TIME), contains(earliestTimer)); + Collection<FiredTimers> firstFiredTimers = manager.extractFiredTimers(); + assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable())); + FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers); + assertThat(firstFired.getTimers(), contains(earliestTimer)); clock.set(new Instant(50_000L)); manager.updateWatermarks(null, @@ -1222,24 +1203,19 @@ public class WatermarkManagerTest implements Serializable { Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers = + Collection<FiredTimers> secondFiredTimers = manager.extractFiredTimers(); - assertThat( - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<StructuralKey<?>, FiredTimers> secondFilteredTimers = - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(secondFilteredTimers.get(key), not(nullValue())); - FiredTimers secondFired = secondFilteredTimers.get(key); + assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable())); + FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers); // Contains, in order, middleTimer and then lastTimer - assertThat(secondFired.getTimers(TimeDomain.PROCESSING_TIME), contains(middleTimer, lastTimer)); + assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer)); } @Test public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() { - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers = - manager.extractFiredTimers(); + Collection<FiredTimers> initialTimers = manager.extractFiredTimers(); // Watermarks haven't advanced - assertThat(initialTimers.entrySet(), emptyIterable()); + assertThat(initialTimers, emptyIterable()); // Advance WM of keyed past the first timer, but ahead of the second and third CommittedBundle<Integer> createdBundle = multiWindowedBundle(filtered); @@ -1273,16 +1249,11 @@ public class WatermarkManagerTest implements Serializable { new Instant(1000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers = + Collection<FiredTimers> firstFiredTimers = manager.extractFiredTimers(); - assertThat( - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<StructuralKey<?>, FiredTimers> firstFilteredTimers = - firstTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(firstFilteredTimers.get(key), not(nullValue())); - FiredTimers firstFired = firstFilteredTimers.get(key); - assertThat( - firstFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), contains(earliestTimer)); + assertThat(firstFiredTimers, not(Matchers.<FiredTimers>emptyIterable())); + FiredTimers firstFired = Iterables.getOnlyElement(firstFiredTimers); + assertThat(firstFired.getTimers(), contains(earliestTimer)); clock.set(new Instant(50_000L)); manager.updateWatermarks(null, @@ -1292,18 +1263,11 @@ public class WatermarkManagerTest implements Serializable { Collections.<CommittedBundle<?>>emptyList()), new Instant(50_000L)); manager.refreshAll(); - Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers = - manager.extractFiredTimers(); - assertThat( - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue())); - Map<StructuralKey<?>, FiredTimers> secondFilteredTimers = - secondTransformFiredTimers.get(filtered.getProducingTransformInternal()); - assertThat(secondFilteredTimers.get(key), not(nullValue())); - FiredTimers secondFired = secondFilteredTimers.get(key); + Collection<FiredTimers> secondFiredTimers = manager.extractFiredTimers(); + assertThat(secondFiredTimers, not(Matchers.<FiredTimers>emptyIterable())); + FiredTimers secondFired = Iterables.getOnlyElement(secondFiredTimers); // Contains, in order, middleTimer and then lastTimer - assertThat( - secondFired.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), - contains(middleTimer, lastTimer)); + assertThat(secondFired.getTimers(), contains(middleTimer, lastTimer)); } @Test