guozhangwang commented on code in PR #12204: URL: https://github.com/apache/kafka/pull/12204#discussion_r907965204
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java: ########## @@ -148,55 +173,182 @@ public void process(final Record<KIn, VIn> record) { } } - if (mergedWindow.end() < closeTime) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record for expired window. " + - "topic=[{}] " + - "partition=[{}] " + - "offset=[{}] " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), - timestamp, - mergedWindow.start(), mergedWindow.end(), - closeTime, - observedStreamTime - ); - } else { - LOG.warn( - "Skipping record for expired window. Topic, partition, and offset not known. " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - timestamp, - mergedWindow.start(), mergedWindow.end(), - closeTime, - observedStreamTime - ); - } - droppedRecordsSensor.record(); + if (mergedWindow.end() < windowCloseTime) { + logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow); } else { if (!mergedWindow.equals(newSessionWindow)) { for (final KeyValue<Windowed<KIn>, VAgg> session : merged) { store.remove(session.key); + + maybeForwardUpdate(session.key, session.value, null); + /* tupleForwarder.maybeForward( record.withKey(session.key) .withValue(new Change<>(null, session.value))); + */ } } agg = aggregator.apply(record.key(), record.value(), agg); final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow); store.put(sessionKey, agg); + + maybeForwardUpdate(sessionKey, null, agg); + /* tupleForwarder.maybeForward( record.withKey(sessionKey) .withValue(new Change<>(agg, null))); + */ + } + + maybeForwardFinalResult(record, windowCloseTime); + } + + private void maybeForwardUpdate(final Windowed<KIn> windowedkey, + final VAgg oldAgg, + final VAgg newAgg) { + if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + return; + } + + // Update the sent record timestamp to the window end time if possible + final long newTimestamp = windowedkey.window().end(); + tupleForwarder.maybeForward(new Record<>(windowedkey, new Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp)); + } + + // TODO: consolidate SessionWindow with TimeWindow to merge common functions + private void maybeForwardFinalResult(final Record<KIn, VIn> record, final long windowCloseTime) { + if (shouldEmitFinal(windowCloseTime)) { + final long emitRangeUpperBound = emitRangeUpperBound(windowCloseTime); + + // if the upper bound is smaller than 0, then there's no window closed ever; + // and we can skip range fetching + if (emitRangeUpperBound >= 0) { + final long emitRangeLowerBound = emitRangeLowerBound(); + + if (shouldRangeFetch(emitRangeLowerBound, emitRangeUpperBound)) { + fetchAndEmit(record, windowCloseTime, emitRangeLowerBound, emitRangeUpperBound); + } + } + } + } + + private boolean shouldEmitFinal(final long windowCloseTime) { + if (emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + return false; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return false; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered every time + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Only EMIT if the window close time does progress + return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < windowCloseTime; + } + + private long emitRangeLowerBound() { + return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP ? 0L : Math.max(0L, lastEmitWindowCloseTime); + } + + private long emitRangeUpperBound(final long windowCloseTime) { + // Session window's start and end timestamps are inclusive, so + // we should minus 1 for the inclusive closed window-end upper bound + return windowCloseTime - 1; + } + + private boolean shouldRangeFetch(final long emitRangeLowerBound, final long emitRangeUpperBound) { + return emitRangeUpperBound > emitRangeLowerBound; + } + + private void fetchAndEmit(final Record<KIn, VIn> record, + final long windowCloseTime, + final long emitRangeLowerBound, + final long emitRangeUpperBound) { + final long startMs = time.milliseconds(); Review Comment: Should be ms to be consistent with other metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org