guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r880778694
##########
streams/src/main/java/org/apache/kafka/streams/state/SessionStore.java:
##########
@@ -39,6 +39,13 @@
*/
public interface SessionStore<K, AGG> extends StateStore,
ReadOnlySessionStore<K, AGG> {
+ // TODO: javadoc; both ends are inclusive
+ default KeyValueIterator<Windowed<K>, AGG> findSessions(final Instant
earliestSessionEndTime,
Review Comment:
This is related to 1) in the description, and the first open question: is
this public API worth to add? Note I added it into SessionStore not
ReadOnlySessionStore, to not expose via IQv1, also I've only added this
function for `Instant` param type as well.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
}
}
- if (mergedWindow.end() < closeTime) {
- if (context().recordMetadata().isPresent()) {
- final RecordMetadata recordMetadata =
context().recordMetadata().get();
- LOG.warn(
- "Skipping record for expired window. " +
- "topic=[{}] " +
- "partition=[{}] " +
- "offset=[{}] " +
- "timestamp=[{}] " +
- "window=[{},{}] " +
- "expiration=[{}] " +
- "streamTime=[{}]",
- recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset(),
- timestamp,
- mergedWindow.start(), mergedWindow.end(),
- closeTime,
- observedStreamTime
- );
- } else {
- LOG.warn(
- "Skipping record for expired window. Topic, partition,
and offset not known. " +
- "timestamp=[{}] " +
- "window=[{},{}] " +
- "expiration=[{}] " +
- "streamTime=[{}]",
- timestamp,
- mergedWindow.start(), mergedWindow.end(),
- closeTime,
- observedStreamTime
- );
- }
- droppedRecordsSensor.record();
+ if (mergedWindow.end() < windowCloseTime) {
+ logSkippedRecordForExpiredWindow(timestamp, windowCloseTime,
mergedWindow);
} else {
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<KIn>, VAgg> session : merged)
{
store.remove(session.key);
+
+ maybeForwardUpdate(session.key, session.value, null,
record.timestamp());
+ /*
tupleForwarder.maybeForward(
record.withKey(session.key)
.withValue(new Change<>(null, session.value)));
+ */
}
}
agg = aggregator.apply(record.key(), record.value(), agg);
final Windowed<KIn> sessionKey = new Windowed<>(record.key(),
mergedWindow);
store.put(sessionKey, agg);
+
+ maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+ /*
tupleForwarder.maybeForward(
record.withKey(sessionKey)
.withValue(new Change<>(agg, null)));
+ */
}
+
+ maybeForwardFinalResult(record, windowCloseTime);
+ }
+
+ private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+ final VAgg oldAgg,
+ final VAgg newAgg,
+ final long oldTimestamp) {
+ if (emitStrategy.type() ==
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+ return;
+ }
+
+ // Update the sent record timestamp to the window end time if
possible
+ final long newTimestamp = windowedkey.key() != null ?
windowedkey.window().end() : oldTimestamp;
+
+ tupleForwarder.maybeForward(new Record<>(windowedkey, new
Change<>(newAgg, sendOldValues ? oldAgg : null), newTimestamp));
+ }
+
+ // TODO: consolidate SessionWindow with TimeWindow to merge common
functions
Review Comment:
I realize that our SessionWindow and TimeWindow, and even SlidingWindow
caused many code duplications (e.g. here) where we can just consolidate into
the same class, with boolean flags indicating if the start/end are inclusive or
exclusive, with that we can further reduce code duplication. Will file a JIRA
for it.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java:
##########
@@ -38,7 +39,7 @@
@SuppressWarnings({"unchecked", "rawtypes"})
TimestampedTupleForwarder(final StateStore store,
final ProcessorContext<K, Change<V>> context,
- final TimestampedCacheFlushListener<K, V>
flushListener,
+ final CacheFlushListener<K, ?> flushListener,
Review Comment:
This is 3) in the description: as we can use the base class, we then would
not need the duplicated TupleFowarder any more.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
@Override
public byte[] fetchSession(final Bytes key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime) {
+ final long sessionStartTime,
Review Comment:
This is a minor fix on the param names: the old ones are simply wrong and
misleading.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
}
}
- if (mergedWindow.end() < closeTime) {
- if (context().recordMetadata().isPresent()) {
- final RecordMetadata recordMetadata =
context().recordMetadata().get();
- LOG.warn(
- "Skipping record for expired window. " +
- "topic=[{}] " +
- "partition=[{}] " +
- "offset=[{}] " +
- "timestamp=[{}] " +
- "window=[{},{}] " +
- "expiration=[{}] " +
- "streamTime=[{}]",
- recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset(),
- timestamp,
- mergedWindow.start(), mergedWindow.end(),
- closeTime,
- observedStreamTime
- );
- } else {
- LOG.warn(
- "Skipping record for expired window. Topic, partition,
and offset not known. " +
- "timestamp=[{}] " +
- "window=[{},{}] " +
- "expiration=[{}] " +
- "streamTime=[{}]",
- timestamp,
- mergedWindow.start(), mergedWindow.end(),
- closeTime,
- observedStreamTime
- );
- }
- droppedRecordsSensor.record();
+ if (mergedWindow.end() < windowCloseTime) {
+ logSkippedRecordForExpiredWindow(timestamp, windowCloseTime,
mergedWindow);
} else {
if (!mergedWindow.equals(newSessionWindow)) {
for (final KeyValue<Windowed<KIn>, VAgg> session : merged)
{
store.remove(session.key);
+
+ maybeForwardUpdate(session.key, session.value, null,
record.timestamp());
+ /*
tupleForwarder.maybeForward(
record.withKey(session.key)
.withValue(new Change<>(null, session.value)));
+ */
}
}
agg = aggregator.apply(record.key(), record.value(), agg);
final Windowed<KIn> sessionKey = new Windowed<>(record.key(),
mergedWindow);
store.put(sessionKey, agg);
+
+ maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+ /*
Review Comment:
Will remove commented out code when removing WIP, ditto elsewhere.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java:
##########
@@ -91,8 +91,8 @@ public interface SegmentedBytesStore extends StateStore {
/**
* Gets all the key-value pairs that belong to the windows within in the
given time range.
*
- * @param from the beginning of the time slot from which to search
- * @param to the end of the time slot from which to search
+ * @param from the beginning of the time slot from which to search
(inclusive)
Review Comment:
Minor javadoc improvement to remind developers.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
);
}
+ @Override
+ public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final
Instant earliestSessionEndTime,
+ final
Instant latestSessionEndTime) {
+ final long earliestEndTime =
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+ prepareMillisCheckFailMsgPrefix(earliestSessionEndTime,
"earliestSessionEndTime"));
+ final long latestEndTime =
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+ prepareMillisCheckFailMsgPrefix(latestSessionEndTime,
"latestSessionEndTime"));
+
+ final KeyValueIterator<Bytes, byte[]> bytesIterator =
wrapped().fetchAll(earliestEndTime, latestEndTime);
Review Comment:
This is the second open question: with the current prefixed (base, i.e.
time-first) session key schema, this fetchAll would be effectively searching
for `[earliestEnd, INF]` because of this logic:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedSessionKeySchemas.java#L46
This is because we translate the range query without key inside
`AbstractRocksDBTimeOrderedSegmentedBytesStore` by using the `lower/upperRange`
instead of `lower/upperRangeFixedSize):
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java#L241-L242
I cannot remember why we need to do this. @lihaosky @mjsax do you remember
why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]