Add timerId to TimerData This timerId is generated to be identical to historical behavior, and to be unique per time domain and timestamp.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/840fb3b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/840fb3b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/840fb3b9 Branch: refs/heads/master Commit: 840fb3b9030908ef50937cc2e4498a2cdcb7b680 Parents: a088449 Author: Kenneth Knowles <k...@google.com> Authored: Wed Nov 23 14:30:57 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Dec 2 13:28:57 2016 -0800 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 4 +- .../runners/direct/DirectTimerInternals.java | 4 +- .../apache/beam/sdk/util/TimerInternals.java | 107 +++++++------------ .../sdk/util/state/InMemoryTimerInternals.java | 8 +- .../beam/sdk/util/TimerInternalsTest.java | 4 +- 5 files changed, 49 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 3b0e4f2..f49c785 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -426,8 +426,8 @@ public class ApexGroupByKeyOperator<K, V> implements Operator { public class ApexTimerInternals implements TimerInternals { @Override - public void setTimer(TimerData timerKey) { - registerActiveTimer(context.element().key(), timerKey); + public void setTimer(TimerData timerData) { + registerActiveTimer(context.element().key(), timerData); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 4245a87..8970b4b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -53,8 +53,8 @@ class DirectTimerInternals implements TimerInternals { } @Override - public void setTimer(TimerData timerKey) { - timerUpdateBuilder.setTimer(timerKey); + public void setTimer(TimerData timerData) { + timerUpdateBuilder.setTimer(timerData); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java index 5d4a72d..c3e498e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java @@ -18,18 +18,16 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.MoreObjects; +import com.google.auto.value.AutoValue; import com.google.common.collect.ComparisonChain; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.List; -import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -50,7 +48,7 @@ import org.joda.time.Instant; public interface TimerInternals { /** - * Writes out a timer to be fired when the current time in the specified time domain reaches the + * Sets a timer to be fired when the current time in the specified time domain reaches the * target timestamp. * * <p>The combination of {@code namespace} and {@code timerId} uniquely identify a timer. @@ -63,14 +61,9 @@ public interface TimerInternals { void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain); /** - * Writes out a timer to be fired when the watermark reaches the given timestamp, automatically - * generating an id for it from the provided {@link TimerData}. - * - * <p>The {@link TimerData} contains all the fields necessary to set the timer. The timer's ID - * is determinstically generated from the {@link TimerData}, so it may be canceled using - * the same {@link TimerData}. + * Sets the timer described by {@code timerData}. */ - void setTimer(TimerData timerKey); + void setTimer(TimerData timerData); /** * Deletes the given timer. @@ -78,7 +71,7 @@ public interface TimerInternals { void deleteTimer(StateNamespace namespace, String timerId); /** - * Deletes the given timer, automatically inferring its ID from the {@link TimerData}. + * Deletes the timer with the ID contained in the provided {@link TimerData}. */ void deleteTimer(TimerData timerKey); @@ -163,64 +156,38 @@ public interface TimerInternals { /** * Data about a timer as represented within {@link TimerInternals}. */ - class TimerData implements Comparable<TimerData> { - private final StateNamespace namespace; - private final Instant timestamp; - private final TimeDomain domain; + @AutoValue + abstract class TimerData implements Comparable<TimerData> { - private TimerData(StateNamespace namespace, Instant timestamp, TimeDomain domain) { - this.namespace = checkNotNull(namespace); - this.timestamp = checkNotNull(timestamp); - this.domain = checkNotNull(domain); - } + public abstract String getTimerId(); - public StateNamespace getNamespace() { - return namespace; - } + public abstract StateNamespace getNamespace(); - public Instant getTimestamp() { - return timestamp; - } + public abstract Instant getTimestamp(); - public TimeDomain getDomain() { - return domain; - } + public abstract TimeDomain getDomain(); /** - * Construct the {@code TimerKey} for the given parameters. + * Construct a {@link TimerData} for the given parameters, where the timer ID is automatically + * generated. */ - public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) { - return new TimerData(namespace, timestamp, domain); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (!(obj instanceof TimerData)) { - return false; - } - - TimerData that = (TimerData) obj; - return Objects.equals(this.domain, that.domain) - && this.timestamp.isEqual(that.timestamp) - && Objects.equals(this.namespace, that.namespace); - } - - @Override - public int hashCode() { - return Objects.hash(domain, timestamp, namespace); + public static TimerData of( + String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) { + return new AutoValue_TimerInternals_TimerData(timerId, namespace, timestamp, domain); } - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("namespace", namespace) - .add("timestamp", timestamp) - .add("domain", domain) - .toString(); + /** + * Construct a {@link TimerData} for the given parameters, where the timer ID is + * deterministically generated from the {@code timestamp} and {@code domain}. + */ + public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) { + String timerId = + new StringBuilder() + .append(domain.ordinal()) + .append(':') + .append(timestamp.getMillis()) + .toString(); + return of(timerId, namespace, timestamp, domain); } /** @@ -236,11 +203,11 @@ public interface TimerInternals { } ComparisonChain chain = ComparisonChain.start() - .compare(this.timestamp, that.getTimestamp()) - .compare(this.domain, that.domain); - if (chain.result() == 0 && !this.namespace.equals(that.namespace)) { + .compare(this.getTimestamp(), that.getTimestamp()) + .compare(this.getDomain(), that.getDomain()); + if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) { // Obtaining the stringKey may be expensive; only do so if required - chain = chain.compare(namespace.stringKey(), that.namespace.stringKey()); + chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey()); } return chain.result(); } @@ -275,20 +242,22 @@ public interface TimerInternals { public void encode(TimerData timer, OutputStream outStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); - STRING_CODER.encode(timer.namespace.stringKey(), outStream, nestedContext); - INSTANT_CODER.encode(timer.timestamp, outStream, nestedContext); - STRING_CODER.encode(timer.domain.name(), outStream, nestedContext); + STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext); + STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext); + INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext); + STRING_CODER.encode(timer.getDomain().name(), outStream, nestedContext); } @Override public TimerData decode(InputStream inStream, Context context) throws CoderException, IOException { Context nestedContext = context.nested(); + String timerId = STRING_CODER.decode(inStream, nestedContext); StateNamespace namespace = StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder); Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext); TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext)); - return TimerData.of(namespace, timestamp, domain); + return TimerData.of(timerId, namespace, timestamp, domain); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index a3bb45a..60a90f5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -104,10 +104,10 @@ public class InMemoryTimerInternals implements TimerInternals { } @Override - public void setTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.setTimer: {}", timer); - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); + public void setTimer(TimerData timerData) { + WindowTracing.trace("TestTimerInternals.setTimer: {}", timerData); + if (existingTimers.add(timerData)) { + queue(timerData.getDomain()).add(timerData); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java index e8ffdb3..7b56f1c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java @@ -45,12 +45,14 @@ public class TimerInternalsTest { public void testTimerDataCoder() throws Exception { CoderProperties.coderDecodeEncodeEqual( TimerDataCoder.of(GlobalWindow.Coder.INSTANCE), - TimerData.of(StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME)); + TimerData.of( + "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME)); Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder(); CoderProperties.coderDecodeEncodeEqual( TimerDataCoder.of(windowCoder), TimerData.of( + "another-id", StateNamespaces.window( windowCoder, new IntervalWindow(new Instant(0), new Instant(100))), new Instant(99), TimeDomain.PROCESSING_TIME));