dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959836682
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -279,18 +302,15 @@ public void setTimer(TimerData timerData) {
// persist it first
state.persist(keyedTimerData);
- // TO-DO: apply the same memory optimization over processing timers
+ // add timer data to buffer where applicable
switch (timerData.getDomain()) {
case EVENT_TIME:
/*
* To determine if the upcoming KeyedTimerData could be added to the
Buffer while
* guaranteeing the Buffer's timestamps are all <= than those in
State Store to preserve
* timestamp eviction priority:
*
- * <p>1) If maxEventTimeInBuffer == long.MAX_VALUE, it indicates
that the State is empty,
- * therefore all the Event times greater or lesser than newTimestamp
are in the buffer;
- *
- * <p>2) If newTimestamp < maxEventTimeInBuffer, it indicates that
there are entries
Review Comment:
This no longer represent the below code completely changed (even before my
change, removed 1) and kept 2)
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -444,6 +470,16 @@ private class SamzaTimerState {
new TimerKeyCoder<>(keyCoder, windowCoder),
VarLongCoder.of()));
+ this.timestampSortedProcessTimeTimerState =
+ (SamzaSetState<KeyedTimerData<K>>)
+ nonKeyedStateInternalsFactory
+ .stateInternalsForKey(null)
+ .state(
+ StateNamespaces.global(),
+ StateTags.set(
+ timerStateId + "-pts",
Review Comment:
good point will name it to process-timestamp-sorted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]