Support set and delete of timer by ID in InMemoryTimerInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df2e540d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df2e540d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df2e540d Branch: refs/heads/python-sdk Commit: df2e540d7a7b8444b9ff3b404740d5a3394b7691 Parents: acd2196 Author: Kenneth Knowles <k...@google.com> Authored: Mon Dec 19 14:01:36 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 11:01:36 2016 -0800 ---------------------------------------------------------------------- .../runners/core/InMemoryTimerInternals.java | 65 +++++++---- .../core/InMemoryTimerInternalsTest.java | 112 +++++++++++++------ 2 files changed, 120 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 5fcd088..292ac23 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -17,13 +17,15 @@ */ package org.apache.beam.runners.core; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.base.MoreObjects; -import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Set; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import java.util.NavigableSet; +import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; @@ -35,17 +37,17 @@ import org.joda.time.Instant; /** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */ public class InMemoryTimerInternals implements TimerInternals { - /** At most one timer per timestamp is kept. */ - private Set<TimerData> existingTimers = new HashSet<>(); + /** The current set timers by namespace and ID. */ + Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create(); /** Pending input watermark timers, in timestamp order. */ - private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); + private NavigableSet<TimerData> watermarkTimers = new TreeSet<>(); /** Pending processing time timers, in timestamp order. */ - private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); + private NavigableSet<TimerData> processingTimers = new TreeSet<>(); /** Pending synchronized processing time timers, in timestamp order. */ - private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11); + private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>(); /** Current input watermark. */ private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; @@ -74,13 +76,13 @@ public class InMemoryTimerInternals implements TimerInternals { final TimerData data; switch (domain) { case EVENT_TIME: - data = watermarkTimers.peek(); + data = watermarkTimers.first(); break; case PROCESSING_TIME: - data = processingTimers.peek(); + data = processingTimers.first(); break; case SYNCHRONIZED_PROCESSING_TIME: - data = synchronizedProcessingTimers.peek(); + data = synchronizedProcessingTimers.first(); break; default: throw new IllegalArgumentException("Unexpected time domain: " + domain); @@ -88,7 +90,7 @@ public class InMemoryTimerInternals implements TimerInternals { return (data == null) ? null : data.getTimestamp(); } - private PriorityQueue<TimerData> queue(TimeDomain domain) { + private NavigableSet<TimerData> timersForDomain(TimeDomain domain) { switch (domain) { case EVENT_TIME: return watermarkTimers; @@ -104,27 +106,45 @@ public class InMemoryTimerInternals implements TimerInternals { @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); + setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } @Override public void setTimer(TimerData timerData) { WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); - if (existingTimers.add(timerData)) { - queue(timerData.getDomain()).add(timerData); + + @Nullable + TimerData existing = existingTimers.get(timerData.getNamespace(), timerData.getTimerId()); + if (existing == null) { + existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData); + timersForDomain(timerData.getDomain()).add(timerData); + } else { + checkArgument(timerData.getDomain().equals(existing.getDomain()), + "Attempt to set %s for time domain %s, but it is already set for time domain %s", + timerData.getTimerId(), timerData.getDomain(), existing.getDomain()); + + if (!timerData.getTimestamp().equals(existing.getTimestamp())) { + NavigableSet<TimerData> timers = timersForDomain(timerData.getDomain()); + timers.remove(existing); + timers.add(timerData); + existingTimers.put(timerData.getNamespace(), timerData.getTimerId(), timerData); + } } } @Override public void deleteTimer(StateNamespace namespace, String timerId) { - throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + TimerData existing = existingTimers.get(namespace, timerId); + if (existing != null) { + deleteTimer(existing); + } } @Override public void deleteTimer(TimerData timer) { WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer); - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); + existingTimers.remove(timer.getNamespace(), timer.getTimerId()); + timersForDomain(timer.getDomain()).remove(timer); } @Override @@ -261,10 +281,11 @@ public class InMemoryTimerInternals implements TimerInternals { @Nullable private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) { - PriorityQueue<TimerData> queue = queue(domain); - if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { - TimerData timer = queue.remove(); - existingTimers.remove(timer); + NavigableSet<TimerData> timers = timersForDomain(domain); + + if (!timers.isEmpty() && currentTime.isAfter(timers.first().getTimestamp())) { + TimerData timer = timers.pollFirst(); + existingTimers.remove(timer.getNamespace(), timer.getTimerId()); return timer; } else { return null; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/df2e540d/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index 2caa874..e711285 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -19,8 +19,6 @@ package org.apache.beam.runners.core; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.util.TimeDomain; @@ -39,37 +37,79 @@ import org.junit.runners.JUnit4; public class InMemoryTimerInternalsTest { private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); + private static final String ID1 = "id1"; + private static final String ID2 = "id2"; @Test - public void testFiringTimers() throws Exception { + public void testFiringEventTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + TimerData eventTimer1 = TimerData.of(ID1, NS1, new Instant(19), TimeDomain.EVENT_TIME); + TimerData eventTimer2 = TimerData.of(ID2, NS1, new Instant(29), TimeDomain.EVENT_TIME); - underTest.setTimer(processingTime1); - underTest.setTimer(processingTime2); + underTest.setTimer(eventTimer1); + underTest.setTimer(eventTimer2); - underTest.advanceProcessingTime(new Instant(20)); - assertEquals(processingTime1, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); + underTest.advanceInputWatermark(new Instant(20)); + assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer1)); + assertThat(underTest.removeNextEventTimer(), nullValue()); // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(new Instant(21)); - assertNull(underTest.removeNextProcessingTimer()); + underTest.advanceInputWatermark(new Instant(21)); + assertThat(underTest.removeNextEventTimer(), nullValue()); // Adding the timer and advancing a little should refire - underTest.setTimer(processingTime1); - assertEquals(processingTime1, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); + underTest.setTimer(eventTimer1); + assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer1)); + assertThat(underTest.removeNextEventTimer(), nullValue()); // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(new Instant(30)); - assertEquals(processingTime2, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); + underTest.advanceInputWatermark(new Instant(30)); + assertThat(underTest.removeNextEventTimer(), equalTo(eventTimer2)); + assertThat(underTest.removeNextEventTimer(), nullValue()); + } + + @Test + public void testResetById() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + Instant earlyTimestamp = new Instant(13); + Instant laterTimestamp = new Instant(42); + + underTest.advanceInputWatermark(new Instant(0)); + underTest.setTimer(NS1, ID1, earlyTimestamp, TimeDomain.EVENT_TIME); + underTest.setTimer(NS1, ID1, laterTimestamp, TimeDomain.EVENT_TIME); + underTest.advanceInputWatermark(earlyTimestamp.plus(1L)); + assertThat(underTest.removeNextEventTimer(), nullValue()); + + underTest.advanceInputWatermark(laterTimestamp.plus(1L)); + assertThat( + underTest.removeNextEventTimer(), + equalTo(TimerData.of(ID1, NS1, laterTimestamp, TimeDomain.EVENT_TIME))); + } + + @Test + public void testDeletionIdempotent() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + Instant timestamp = new Instant(42); + underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME); + underTest.deleteTimer(NS1, ID1); + underTest.deleteTimer(NS1, ID1); } @Test - public void testFiringTimersWithCallback() throws Exception { + public void testDeletionById() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + Instant timestamp = new Instant(42); + + underTest.advanceInputWatermark(new Instant(0)); + underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME); + underTest.deleteTimer(NS1, ID1); + underTest.advanceInputWatermark(new Instant(43)); + + assertThat(underTest.removeNextEventTimer(), nullValue()); + } + + @Test + public void testFiringProcessingTimeTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); @@ -116,23 +156,25 @@ public class InMemoryTimerInternalsTest { underTest.setTimer(eventTime2); underTest.setTimer(synchronizedProcessingTime2); - assertNull(underTest.removeNextEventTimer()); + assertThat(underTest.removeNextEventTimer(), nullValue()); underTest.advanceInputWatermark(new Instant(30)); - assertEquals(eventTime1, underTest.removeNextEventTimer()); - assertEquals(eventTime2, underTest.removeNextEventTimer()); - assertNull(underTest.removeNextEventTimer()); + assertThat(underTest.removeNextEventTimer(), equalTo(eventTime1)); + assertThat(underTest.removeNextEventTimer(), equalTo(eventTime2)); + assertThat(underTest.removeNextEventTimer(), nullValue()); - assertNull(underTest.removeNextProcessingTimer()); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); underTest.advanceProcessingTime(new Instant(30)); - assertEquals(processingTime1, underTest.removeNextProcessingTimer()); - assertEquals(processingTime2, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); - assertNull(underTest.removeNextSynchronizedProcessingTimer()); + assertThat(underTest.removeNextSynchronizedProcessingTimer(), nullValue()); underTest.advanceSynchronizedProcessingTime(new Instant(30)); - assertEquals(synchronizedProcessingTime1, underTest.removeNextSynchronizedProcessingTimer()); - assertEquals(synchronizedProcessingTime2, underTest.removeNextSynchronizedProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); + assertThat( + underTest.removeNextSynchronizedProcessingTimer(), equalTo(synchronizedProcessingTime1)); + assertThat( + underTest.removeNextSynchronizedProcessingTimer(), equalTo(synchronizedProcessingTime2)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); } @Test @@ -147,9 +189,9 @@ public class InMemoryTimerInternalsTest { underTest.advanceProcessingTime(new Instant(20)); underTest.advanceInputWatermark(new Instant(20)); - assertEquals(processingTime, underTest.removeNextProcessingTimer()); - assertNull(underTest.removeNextProcessingTimer()); - assertEquals(eventTime, underTest.removeNextEventTimer()); - assertNull(underTest.removeNextEventTimer()); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); + assertThat(underTest.removeNextEventTimer(), equalTo(eventTime)); + assertThat(underTest.removeNextEventTimer(), nullValue()); } }