guozhangwang commented on code in PR #12204: URL: https://github.com/apache/kafka/pull/12204#discussion_r907967640
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java: ########## @@ -71,6 +72,36 @@ public byte[] fetchSession(final Bytes key, )); } + public KeyValueIterator<Bytes, byte[]> fetchSessions(final long earliestSessionEndTime, + final long latestSessionEndTime) { + final List<KeyValueSegment> searchSpace = segments.segments(earliestSessionEndTime, latestSessionEndTime, true); + + // here we want [0, latestSE, FF] as the upper bound to cover any possible keys, Review Comment: This is the other minor bug I detected in the latest commit. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java: ########## @@ -148,55 +173,174 @@ public void process(final Record<KIn, VIn> record) { } } - if (mergedWindow.end() < closeTime) { - if (context().recordMetadata().isPresent()) { - final RecordMetadata recordMetadata = context().recordMetadata().get(); - LOG.warn( - "Skipping record for expired window. " + - "topic=[{}] " + - "partition=[{}] " + - "offset=[{}] " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset(), - timestamp, - mergedWindow.start(), mergedWindow.end(), - closeTime, - observedStreamTime - ); - } else { - LOG.warn( - "Skipping record for expired window. Topic, partition, and offset not known. " + - "timestamp=[{}] " + - "window=[{},{}] " + - "expiration=[{}] " + - "streamTime=[{}]", - timestamp, - mergedWindow.start(), mergedWindow.end(), - closeTime, - observedStreamTime - ); - } - droppedRecordsSensor.record(); + if (mergedWindow.end() < windowCloseTime) { + logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, mergedWindow); } else { if (!mergedWindow.equals(newSessionWindow)) { for (final KeyValue<Windowed<KIn>, VAgg> session : merged) { store.remove(session.key); - tupleForwarder.maybeForward( - record.withKey(session.key) - .withValue(new Change<>(null, session.value))); + + maybeForwardUpdate(session.key, session.value, null); } } agg = aggregator.apply(record.key(), record.value(), agg); final Windowed<KIn> sessionKey = new Windowed<>(record.key(), mergedWindow); store.put(sessionKey, agg); + + maybeForwardUpdate(sessionKey, null, agg); + } + + maybeForwardFinalResult(record, windowCloseTime); + } + + private void maybeForwardUpdate(final Windowed<KIn> windowedkey, + final VAgg oldAgg, + final VAgg newAgg) { + if (emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + return; + } + + // Update the sent record timestamp to the window end time if possible + final long newTimestamp = windowedkey.window().end(); + tupleForwarder.maybeForward(new Record<>(windowedkey, new Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp)); + } + + // TODO: consolidate SessionWindow with TimeWindow to merge common functions + private void maybeForwardFinalResult(final Record<KIn, VIn> record, final long windowCloseTime) { + if (shouldEmitFinal(windowCloseTime)) { + final long emitRangeUpperBound = emitRangeUpperBound(windowCloseTime); + + // if the upper bound is smaller than 0, then there's no window closed ever; + // and we can skip range fetching + if (emitRangeUpperBound >= 0) { + final long emitRangeLowerBound = emitRangeLowerBound(); + + if (shouldRangeFetch(emitRangeLowerBound, emitRangeUpperBound)) { + fetchAndEmit(record, windowCloseTime, emitRangeLowerBound, emitRangeUpperBound); + } + } + } + } + + private boolean shouldEmitFinal(final long windowCloseTime) { + if (emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) { + return false; + } + + final long now = internalProcessorContext.currentSystemTimeMs(); + // Throttle emit frequency + if (now < timeTracker.nextTimeToEmit) { + return false; + } + + // Schedule next emit time based on now to avoid the case that if system time jumps a lot, + // this can be triggered every time + timeTracker.nextTimeToEmit = now; + timeTracker.advanceNextTimeToEmit(); + + // Only EMIT if the window close time does progress + return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || lastEmitWindowCloseTime < windowCloseTime; + } + + private long emitRangeLowerBound() { + return Math.max(0L, lastEmitWindowCloseTime); + } + + private long emitRangeUpperBound(final long windowCloseTime) { + // Session window's start and end timestamps are inclusive, so + // we should minus 1 for the inclusive closed window-end upper bound + return windowCloseTime - 1; + } + + private boolean shouldRangeFetch(final long emitRangeLowerBound, final long emitRangeUpperBound) { Review Comment: This is one minor bug I detected in the latest commit. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java: ########## @@ -52,6 +53,16 @@ private SessionStore<Bytes, byte[]> maybeWrapCaching(final SessionStore<Bytes, b if (!enableCaching) { return inner; } + + if (!inner.persistent()) { Review Comment: Here's the other change I made in order to work around the current tricky situation, since in-memory stores are always "time ordered" as well, we stripe the caching if the inner store is not persistent. cc @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org