ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r481518121



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##########
@@ -118,24 +121,28 @@ public void process(final K key, final V value) {
             }
 
             final long timestamp = context().timestamp();
-            //don't process records that don't fall within a full sliding 
window
-            if (timestamp < windows.timeDifferenceMs()) {
-                log.warn(
-                    "Skipping record due to early arrival. value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
-                    value, context().topic(), context().partition(), 
context().offset()
-                );
-                droppedRecordsSensor.record();
-                return;
+            Method backwardsIterator = null;
+
+            try {
+                windowStore.getClass().getMethod("backwardFetch", new Class[] 
{ Object.class, Object.class, Instant.class, Instant.class });
+            } catch (NoSuchMethodException | SecurityException e)  { }
+            if (backwardsIterator != null) {
+                processReverse(key, value, timestamp);
+            } else {
+                processInOrder(key, value, timestamp);

Review comment:
       1) I'm not sure I understand the usage of `backwardsIterator` here. Do 
we ever set it to anything?
   2) I think you're overcomplicating this 🙂  All you need to do is call 
`windowStore.backwardsFetch(...)` and if the underlying store doesn't support 
it, then it will throw UnsupportedOperationException. You don't need to use 
reflection/`getMethod` . Also, if we're ever in a position of catching 
SecurityException, something has probably gone wrong
   3) Originally I was thinking we should do this in `init` so we don't have to 
figure out if it's a reverse store every time a new record gets processed. But 
I just realized that all of the SessionStore fetch methods require a key, so we 
have to do this in `process` (since we don't have a key to pass in during 
`init`, and null keys aren't allowed). We can at least just do it once in the 
first `process`, and then keep track of whether we should use forwards or 
reverse iteration in subsequent ones
   
   Given the above (especially 3), there's no perfect solution, but one thing 
we can do is just keep a `reverseIterationPossible` boolean. If it's false we 
call `processInOrder`, if it's true we call `processReverse`. We also put a 
`catch UnsupportedOperationException` around the `processReverse` call, so if 
it does throw on the first invocation of `process` then we can call 
`processInOrder` and also set `reverseIterationPossible` to false so that we 
never call `processReverse` again. Does that make sense?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to