ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r481518121
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ########## @@ -118,24 +121,28 @@ public void process(final K key, final V value) { } final long timestamp = context().timestamp(); - //don't process records that don't fall within a full sliding window - if (timestamp < windows.timeDifferenceMs()) { - log.warn( - "Skipping record due to early arrival. value=[{}] topic=[{}] partition=[{}] offset=[{}]", - value, context().topic(), context().partition(), context().offset() - ); - droppedRecordsSensor.record(); - return; + Method backwardsIterator = null; + + try { + windowStore.getClass().getMethod("backwardFetch", new Class[] { Object.class, Object.class, Instant.class, Instant.class }); + } catch (NoSuchMethodException | SecurityException e) { } + if (backwardsIterator != null) { + processReverse(key, value, timestamp); + } else { + processInOrder(key, value, timestamp); Review comment: 1) I'm not sure I understand the usage of `backwardsIterator` here. Do we ever set it to anything? 2) I think you're overcomplicating this 🙂 All you need to do is call `windowStore.backwardsFetch(...)` and if the underlying store doesn't support it, then it will throw UnsupportedOperationException. You don't need to use reflection/`getMethod` . Also, if we're ever in a position of catching SecurityException, something has probably gone wrong 3) Originally I was thinking we should do this in `init` so we don't have to figure out if it's a reverse store every time a new record gets processed. But I just realized that all of the SessionStore fetch methods require a key, so we have to do this in `process` (since we don't have a key to pass in during `init`, and null keys aren't allowed). We can at least just do it once in the first `process`, and then keep track of whether we should use forwards or reverse iteration in subsequent ones Given the above (especially 3), there's no perfect solution, but one thing we can do is just keep a `reverseIterationPossible` boolean. If it's false we call `processInOrder`, if it's true we call `processReverse`. We also put a `catch UnsupportedOperationException` around the `processReverse` call, so if it does throw on the first invocation of `process` then we can call `processInOrder` and also set `reverseIterationPossible` to false so that we never call `processReverse` again. Does that make sense? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org