mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158814912


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = 
valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long 
asOfTimestamp) {

Review Comment:
   Wondering if this is correct? This "ValueGetter" queries the 
(non-materialized) result table of in inner-join, and re-computes the result 
on-the-fly? Maybe it will get clear in a follow up PR, but would love to get a 
short / high-level explanation why it's the right thing to do (as I right now 
don't understand it).
   
   What I am worried about it the case, that we don't update the result for 
out-of-order updates, but when we re-compute the result on the fly, we might 
re-compute an "updated" result, and thus don't return the same record as 
perviously emitted.
   
   You call this out in the second example of the "Proposal" section on the KIP:
   ```
   A: (timestamp = 0, value = a0)
   B: (timestamp = 2, value = b2) -> latest record on B side, emits (a0, b2) 
with timestamp=2
   A: (timestamp = 5, value = a5) -> latest record on A side, emits (a5, b2) 
with timestamp=5
   A: (timestamp = 1, value = a1) -> out-of-order record on A side, no new join 
result emitted. the older (a0, b2) join result is no longer correct.
   ```
   
   We don't emit anything for the last record, while the older join result 
becomes invalid (that's ok); but if we call `get(key,1)` later, would we get 
back the older (not updated) `(a2,b0)` (which would be correct to return from a 
consistency point of view because the value-getter should return the same thing 
as if the result state-store would have been materialized, and the value-getter 
is not in use), or would we get back a now updated result `(a1,b2)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to