guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r880778694


##########
streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java:
##########
@@ -39,6 +39,13 @@
  */
 public interface SessionStore<K, AGG> extends StateStore, 
ReadOnlySessionStore<K, AGG> {
 
+    // TODO: javadoc; both ends are inclusive
+    default KeyValueIterator<Windowed<K>, AGG> findSessions(final Instant 
earliestSessionEndTime,

Review Comment:
   This is related to 1) in the description, and the first open question: is 
this public API worth to add? Note I added it into SessionStore not 
ReadOnlySessionStore, to not expose via IQv1, also I've only added this 
function for `Instant` param type as well.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, 
mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) 
{
                         store.remove(session.key);
+
+                        maybeForwardUpdate(session.key, session.value, null, 
record.timestamp());
+                        /*
                         tupleForwarder.maybeForward(
                             record.withKey(session.key)
                                 .withValue(new Change<>(null, session.value)));
+                         */
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), 
mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+                /*
                 tupleForwarder.maybeForward(
                     record.withKey(sessionKey)
                         .withValue(new Change<>(agg, null)));
+                 */
             }
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+                                        final VAgg oldAgg,
+                                        final VAgg newAgg,
+                                        final long oldTimestamp) {
+            if (emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            // Update the sent record timestamp to the window end time if 
possible
+            final long newTimestamp = windowedkey.key() != null ? 
windowedkey.window().end() : oldTimestamp;
+
+            tupleForwarder.maybeForward(new Record<>(windowedkey, new 
Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
+        }
+
+        // TODO: consolidate SessionWindow with TimeWindow to merge common 
functions

Review Comment:
   I realize that our SessionWindow and TimeWindow, and even SlidingWindow 
caused many code duplications (e.g. here) where we can just consolidate into 
the same class, with boolean flags indicating if the start/end are inclusive or 
exclusive, with that we can further reduce code duplication. Will file a JIRA 
for it.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java:
##########
@@ -38,7 +39,7 @@
     @SuppressWarnings({"unchecked", "rawtypes"})
     TimestampedTupleForwarder(final StateStore store,
                               final ProcessorContext<K, Change<V>> context,
-                              final TimestampedCacheFlushListener<K, V> 
flushListener,
+                              final CacheFlushListener<K, ?> flushListener,

Review Comment:
   This is 3) in the description: as we can use the base class, we then would 
not need the duplicated TupleFowarder any more.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,

Review Comment:
   This is a minor fix on the param names: the old ones are simply wrong and 
misleading.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, 
mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) 
{
                         store.remove(session.key);
+
+                        maybeForwardUpdate(session.key, session.value, null, 
record.timestamp());
+                        /*
                         tupleForwarder.maybeForward(
                             record.withKey(session.key)
                                 .withValue(new Change<>(null, session.value)));
+                         */
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), 
mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+                /*

Review Comment:
   Will remove commented out code when removing WIP, ditto elsewhere.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java:
##########
@@ -91,8 +91,8 @@ public interface SegmentedBytesStore extends StateStore {
     /**
      * Gets all the key-value pairs that belong to the windows within in the 
given time range.
      *
-     * @param from the beginning of the time slot from which to search
-     * @param to   the end of the time slot from which to search
+     * @param from the beginning of the time slot from which to search 
(inclusive)

Review Comment:
   Minor javadoc improvement to remind developers.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+                                                                  final 
Instant latestSessionEndTime) {
+        final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+        final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   This is the second open question: with the current prefixed (base, i.e. 
time-first) session key schema, this fetchAll would be effectively searching 
for `[earliestEnd, INF]` because of this logic: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java#L46
   
   This is because we translate the range query without key inside 
`AbstractRocksDBTimeOrderedSegmentedBytesStore` by using the `lower/upperRange` 
instead of `lower/upperRangeFixedSize): 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java#L241-L242
   
   I cannot remember why we need to do this. @lihaosky @mjsax do you remember 
why?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to