guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r907967640


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStore.java:
##########
@@ -71,6 +72,36 @@ public byte[] fetchSession(final Bytes key,
         ));
     }
 
+    public KeyValueIterator<Bytes, byte[]> fetchSessions(final long 
earliestSessionEndTime,
+                                                         final long 
latestSessionEndTime) {
+        final List<KeyValueSegment> searchSpace = 
segments.segments(earliestSessionEndTime, latestSessionEndTime, true);
+
+        // here we want [0, latestSE, FF] as the upper bound to cover any 
possible keys,

Review Comment:
   This is the other minor bug I detected in the latest commit.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,174 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, 
mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) 
{
                         store.remove(session.key);
-                        tupleForwarder.maybeForward(
-                            record.withKey(session.key)
-                                .withValue(new Change<>(null, session.value)));
+
+                        maybeForwardUpdate(session.key, session.value, null);
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), 
mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg);
+            }
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+                                        final VAgg oldAgg,
+                                        final VAgg newAgg) {
+            if (emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            // Update the sent record timestamp to the window end time if 
possible
+            final long newTimestamp = windowedkey.window().end();
+            tupleForwarder.maybeForward(new Record<>(windowedkey, new 
Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
+        }
+
+        // TODO: consolidate SessionWindow with TimeWindow to merge common 
functions
+        private void maybeForwardFinalResult(final Record<KIn, VIn> record, 
final long windowCloseTime) {
+            if (shouldEmitFinal(windowCloseTime)) {
+                final long emitRangeUpperBound = 
emitRangeUpperBound(windowCloseTime);
+
+                // if the upper bound is smaller than 0, then there's no 
window closed ever;
+                // and we can skip range fetching
+                if (emitRangeUpperBound >= 0) {
+                    final long emitRangeLowerBound = emitRangeLowerBound();
+
+                    if (shouldRangeFetch(emitRangeLowerBound, 
emitRangeUpperBound)) {
+                        fetchAndEmit(record, windowCloseTime, 
emitRangeLowerBound, emitRangeUpperBound);
+                    }
+                }
+            }
+        }
+
+        private boolean shouldEmitFinal(final long windowCloseTime) {
+            if (emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return false;
+            }
+
+            final long now = internalProcessorContext.currentSystemTimeMs();
+            // Throttle emit frequency
+            if (now < timeTracker.nextTimeToEmit) {
+                return false;
+            }
+
+            // Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+            // this can be triggered every time
+            timeTracker.nextTimeToEmit = now;
+            timeTracker.advanceNextTimeToEmit();
+
+            // Only EMIT if the window close time does progress
+            return lastEmitWindowCloseTime == ConsumerRecord.NO_TIMESTAMP || 
lastEmitWindowCloseTime < windowCloseTime;
+        }
+
+        private long emitRangeLowerBound() {
+            return Math.max(0L, lastEmitWindowCloseTime);
+        }
+
+        private long emitRangeUpperBound(final long windowCloseTime) {
+            // Session window's start and end timestamps are inclusive, so
+            // we should minus 1 for the inclusive closed window-end upper 
bound
+            return windowCloseTime - 1;
+        }
+
+        private boolean shouldRangeFetch(final long emitRangeLowerBound, final 
long emitRangeUpperBound) {

Review Comment:
   This is one minor bug I detected in the latest commit.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java:
##########
@@ -52,6 +53,16 @@ private SessionStore<Bytes, byte[]> maybeWrapCaching(final 
SessionStore<Bytes, b
         if (!enableCaching) {
             return inner;
         }
+
+        if (!inner.persistent()) {

Review Comment:
   Here's the other change I made in order to work around the current tricky 
situation, since in-memory stores are always "time ordered" as well, we stripe 
the caching if the inner store is not persistent. cc @mjsax 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to