scwhittle commented on code in PR #37604:
URL: https://github.com/apache/beam/pull/37604#discussion_r2821454496
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java:
##########
@@ -58,22 +59,19 @@ public abstract class WindmillTagEncoding {
* @param timerTag tag of the timer that maps to the hold.
*/
public abstract ByteString timerHoldTag(
- WindmillNamespacePrefix prefix, TimerData timerData, ByteString
timerTag);
+ WindmillTimerType windmillTimerType, TimerData timerData, ByteString
timerTag);
/**
* Produce a tag that is guaranteed to be unique for the given prefix,
namespace, domain and
Review Comment:
update comment
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java:
##########
@@ -225,15 +225,16 @@ public TimerData windmillTimerToTimerData(
}
StateNamespace namespace = StateNamespaces.fromString(namespaceString,
windowCoder);
- return TimerData.of(
- timerId,
- timerFamily,
- namespace,
- timestamp,
- outputTimestamp,
- timerTypeToTimeDomain(timer.getType()));
+ return Pair.of(
+ timerType,
+ TimerData.of(
Review Comment:
how about putting the timertype in TimerData? The timedomain could be a
function instead to keep the size same but then it is one less allocation than
this Pair
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java:
##########
@@ -159,17 +155,21 @@ public TimerData windmillTimerToTimerData(
// - the Global StateNamespace is different, and becomes "/"
// - the id is totally arbitrary; currently unescaped though that could
change
- ByteString tag = timer.getTag();
- checkArgument(
- tag.startsWith(prefix.byteString()),
- "Expected timer tag %s to start with prefix %s",
- tag,
- prefix.byteString());
+ ByteString tag =
ByteString.copyFrom(timer.getTag().asReadOnlyByteBuffer());
Review Comment:
if we have teh namespaceprefixstring method could we avoid this copy to
bytestring?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -832,17 +832,13 @@ public <W extends BoundedWindow> TimerData
getNextFiredTimer(Coder<W> windowCode
if (cachedFiredSystemTimers == null) {
cachedFiredSystemTimers =
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isSystemTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
+ .filter(timer -> timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
windmillTagEncoding.windmillTimerToTimerData(
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
- timer,
- windowCoder,
- getDrainMode()))
+ timer, windowCoder, getDrainMode()))
+ .filter(pair -> pair.getLeft() ==
WindmillTimerType.SYSTEM_TIMER)
Review Comment:
can this remain above the transform since it's cheaper?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java:
##########
@@ -20,27 +20,24 @@
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
-/**
- * A prefix for a Windmill state or timer tag to separate user state and
timers from system state
- * and timers.
- */
+/** A type for a Windmill timer to separate user state and timers from system
state and timers. */
@Internal
-public enum WindmillNamespacePrefix {
- USER_NAMESPACE_PREFIX {
+public enum WindmillTimerType {
+ USER_TIMER {
@Override
- public ByteString byteString() {
+ public ByteString namespacePrefix() {
return USER_NAMESPACE_BYTESTRING;
}
},
- SYSTEM_NAMESPACE_PREFIX {
+ SYSTEM_TIMER {
@Override
- public ByteString byteString() {
+ public ByteString namespacePrefix() {
return SYSTEM_NAMESPACE_BYTESTRING;
}
};
- public abstract ByteString byteString();
+ public abstract ByteString namespacePrefix();
Review Comment:
maybe this should not be a method on the enum but static function within the
v1 tag class?
That enforces that elsewhere we are always using the encoding agnostic type
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -895,17 +891,13 @@ public <W extends BoundedWindow> TimerData
getNextFiredUserTimer(Coder<W> window
cachedFiredUserTimers =
Iterators.peekingIterator(
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
+ .filter(timer ->
timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
windmillTagEncoding.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
- timer,
- windowCoder,
- getDrainMode()))
+ timer, windowCoder, getDrainMode()))
+ .filter(pair -> pair.getLeft() ==
WindmillTimerType.USER_TIMER)
Review Comment:
ditto
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java:
##########
@@ -60,28 +60,28 @@ class WindmillTimerInternals implements TimerInternals {
private final Watermarks watermarks;
private final Instant processingTime;
private final String stateFamily;
- private final WindmillNamespacePrefix prefix;
+ private final WindmillTimerType type;
private final Consumer<TimerData> onTimerModified;
private final WindmillTagEncoding windmillTagEncoding;
public WindmillTimerInternals(
String stateFamily, // unique identifies a step
- WindmillNamespacePrefix prefix, // partitions user and system namespaces
into "/u" and "/s"
+ WindmillTimerType type, // partitions user and system namespaces into
"/u" and "/s"
Review Comment:
remove the /u and /s specific comment?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java:
##########
@@ -71,11 +70,11 @@ public InternedByteString stateTag(StateNamespace
namespace, StateTag<?> address
/** {@inheritDoc} */
@Override
public ByteString timerHoldTag(
- WindmillNamespacePrefix prefix, TimerData timerData, ByteString
unusedTimerTag) {
+ WindmillTimerType windmillTimerType, TimerData timerData, ByteString
unusedTimerTag) {
String tagString;
if ("".equals(timerData.getTimerFamilyId())) {
tagString =
- prefix.byteString().toStringUtf8()
+ windmillTimerType.namespacePrefix().toStringUtf8()
Review Comment:
could have namespacePrefixString method on the timertype to avoid doing this
same conversion many times
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java:
##########
@@ -415,7 +416,7 @@ public static Collection<Object[]> data() {
public StateNamespace namespace;
@Parameter(1)
- public WindmillNamespacePrefix prefix;
+ public WindmillTimerType prefix;
Review Comment:
rename
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]