dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959833468


##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -307,7 +327,11 @@ public void setTimer(TimerData timerData) {
           break;
 
         case PROCESSING_TIME:
-          timerRegistry.schedule(keyedTimerData, 
timerData.getTimestamp().getMillis());
+          // Append to buffer iff not full
+          if (processTimeBuffer.size() < maxProcessTimerBufferSize) {
+            processTimeBuffer.add(keyedTimerData);
+          }

Review Comment:
   No we don't need to replace any events in the buffer like in event time 
because process time is fifo only, if the buffer is full we don't do anything 
and it is simply stored in the state 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to