dxichen commented on code in PR #22976:
URL: https://github.com/apache/beam/pull/22976#discussion_r959834109
##########
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java:
##########
@@ -545,57 +596,58 @@ private void reloadEventTimeTimers() {
}
}
- private void loadProcessingTimeTimers() {
- final Iterator<Map.Entry<TimerKey<K>, Long>> iter =
- processingTimeTimerState.readIterator().read();
- // since the iterator will reach to the end, it will be closed
automatically
- int count = 0;
- while (iter.hasNext()) {
- final Map.Entry<TimerKey<K>, Long> entry = iter.next();
- final KeyedTimerData keyedTimerData =
- TimerKey.toKeyedTimerData(
- entry.getKey(), entry.getValue(), TimeDomain.PROCESSING_TIME,
keyCoder);
-
- timerRegistry.schedule(
- keyedTimerData,
keyedTimerData.getTimerData().getTimestamp().getMillis());
- ++count;
+ private void reloadProcessingTimeTimers() {
+ final Iterator<KeyedTimerData<K>> iter =
+ timestampSortedProcessTimeTimerState.readIterator().read();
+
+ while (iter.hasNext() && processTimeBuffer.size() <
maxProcessTimerBufferSize) {
+ final KeyedTimerData keyedTimerData = iter.next();
+ processTimeBuffer.add(keyedTimerData);
}
- processingTimeTimerState.closeIterators();
- LOG.info("Loaded {} processing time timers in memory", count);
+ timestampSortedProcessTimeTimerState.closeIterators();
+ LOG.info("Loaded {} processing time timers in memory",
processTimeBuffer.size());
}
- /**
- * Restore timer state from RocksDB. This is needed for migration of
existing jobs. Give events
- * in eventTimeTimerState, construct timestampSortedEventTimeTimerState
preparing for memory
- * reloading. TO-DO: processing time timers are still loaded into memory
in one shot; will apply
- * the same optimization mechanism as event time timer
- */
+ /** Restore timer state from RocksDB. */
private void init() {
- final Iterator<Map.Entry<TimerKey<K>, Long>> eventTimersIter =
- eventTimeTimerState.readIterator().read();
- // use hasNext to check empty, because this is relatively cheap compared
with Iterators.size()
- if (eventTimersIter.hasNext()) {
- final Iterator sortedEventTimerIter =
- timestampSortedEventTimeTimerState.readIterator().read();
-
- if (!sortedEventTimerIter.hasNext()) {
- // inline the migration code
- while (eventTimersIter.hasNext()) {
- final Map.Entry<TimerKey<K>, Long> entry = eventTimersIter.next();
- final KeyedTimerData<K> keyedTimerData =
- TimerKey.toKeyedTimerData(
- entry.getKey(), entry.getValue(), TimeDomain.EVENT_TIME,
keyCoder);
- timestampSortedEventTimeTimerState.add(keyedTimerData);
- }
- }
- timestampSortedEventTimeTimerState.closeIterators();
- }
- eventTimeTimerState.closeIterators();
+ migrateToKeyedTimerState(
+ eventTimeTimerState, timestampSortedEventTimeTimerState,
TimeDomain.EVENT_TIME);
+ migrateToKeyedTimerState(
+ processingTimeTimerState,
+ timestampSortedProcessTimeTimerState,
+ TimeDomain.PROCESSING_TIME);
reloadEventTimeTimers();
- loadProcessingTimeTimers();
+ reloadProcessingTimeTimers();
+ }
+ }
+
+ /**
+ * This is needed for migration of existing jobs. Give events in timerState,
construct
+ * keyedTimerState preparing for memory reloading.
+ */
+ private void migrateToKeyedTimerState(
+ SamzaMapState<TimerKey<K>, Long> timerState,
+ SamzaSetState<KeyedTimerData<K>> keyedTimerState,
+ TimeDomain timeDomain) {
+ final Iterator<Map.Entry<TimerKey<K>, Long>> timersIter =
timerState.readIterator().read();
+ // use hasNext to check empty, because this is relatively cheap compared
with Iterators.size()
+ if (timersIter.hasNext()) {
+ final Iterator keyedTimerIter = keyedTimerState.readIterator().read();
+
+ if (!keyedTimerIter.hasNext()) {
+ // Migrate from timerState to keyedTimerState
+ while (timersIter.hasNext()) {
+ final Map.Entry<TimerKey<K>, Long> entry = timersIter.next();
+ final KeyedTimerData<K> keyedTimerData =
+ TimerKey.toKeyedTimerData(entry.getKey(), entry.getValue(),
timeDomain, keyCoder);
+ keyedTimerState.add(keyedTimerData);
+ }
+ }
+ keyedTimerState.closeIterators();
Review Comment:
This could be removed after all the samza-beam jobs have migrated to use
only keyed timer data, will add a todo
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]