johnyangk commented on a change in pull request #178: [NEMO-317] Optimize
triggering logic in GroupByKeyAndWindowDoFnTransform
URL: https://github.com/apache/incubator-nemo/pull/178#discussion_r241645416
##########
File path:
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
##########
@@ -212,65 +213,59 @@ public void onWatermark(final Watermark inputWatermark) {
@Override
protected void beforeClose() {
// Finish any pending windows by advancing the input watermark to infinity.
- processElementsAndTriggerTimers(new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()),
- BoundedWindow.TIMESTAMP_MAX_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ inputWatermark = new
Watermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ processElementsAndTriggerTimers(BoundedWindow.TIMESTAMP_MAX_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE);
}
/**
- * Trigger times for current key.
+ * Trigger times if next timers exist.
* When triggering, it emits the windowed data to downstream operators.
- * @param key key
- * @param watermark watermark
* @param processingTime processing time
* @param synchronizedTime synchronized time
*/
- private void triggerTimers(final K key,
- final Watermark watermark,
- final Instant processingTime,
- final Instant synchronizedTime) {
- final InMemoryTimerInternals timerInternals = (InMemoryTimerInternals)
- inMemoryTimerInternalsFactory.timerInternalsForKey(key);
- try {
- timerInternals.advanceInputWatermark(new
Instant(watermark.getTimestamp()));
- timerInternals.advanceProcessingTime(processingTime);
- timerInternals.advanceSynchronizedProcessingTime(synchronizedTime);
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
+ private int triggerTimers(final Instant processingTime,
+ final Instant synchronizedTime) {
+ final ContextForTimer<K> context = inMemoryTimerInternalsFactory.context;
+ context.setCurrentInputWatermarkTime(new
Instant(inputWatermark.getTimestamp()));
+ context.setCurrentProcessingTime(processingTime);
+ context.setCurrentSynchronizedProcessingTime(synchronizedTime);
- final List<TimerInternals.TimerData> timerDataList =
getEligibleTimers(timerInternals);
+ // get next eligible timers
+ final List<Pair<K, TimerInternals.TimerData>> timers = getEligibleTimers();
- if (!timerDataList.isEmpty()) {
+ for (final Pair<K, TimerInternals.TimerData> timer : timers) {
// Trigger timers and emit windowed data
final KeyedWorkItem<K, InputT> timerWorkItem =
- KeyedWorkItems.timersWorkItem(key, timerDataList);
+ KeyedWorkItems.timersWorkItem(timer.left(),
Collections.singletonList(timer.right()));
Review comment:
Minor question: Would it be worthwhile to group timers by keys first and
then invoke `processElement` to minimize the number of invocations?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services