mjsax commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158814912
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
@Override
public ValueAndTimestamp<VOut> get(final K key) {
- final ValueAndTimestamp<V1> valueAndTimestamp1 =
valueGetter1.get(key);
+ return computeJoin(key, valueGetter1::get, valueGetter2::get);
+ }
+
+ @Override
+ public ValueAndTimestamp<VOut> get(final K key, final long
asOfTimestamp) {
Review Comment:
Wondering if this is correct? This "ValueGetter" queries the
(non-materialized) result table of in inner-join, and re-computes the result
on-the-fly? Maybe it will get clear in a follow up PR, but would love to get a
short / high-level explanation why it's the right thing to do (as I right now
don't understand it).
What I am worried about it the case, that we don't update the result for
out-of-order updates, but when we re-compute the result on the fly, we might
re-compute an "updated" result, and thus don't return the same record as
perviously emitted.
You call this out in the second example of the "Proposal" section on the KIP:
```
A: (timestamp = 0, value = a0)
B: (timestamp = 2, value = b2) -> latest record on B side, emits (a0, b2)
with timestamp=2
A: (timestamp = 5, value = a5) -> latest record on A side, emits (a5, b2)
with timestamp=5
A: (timestamp = 1, value = a1) -> out-of-order record on A side, no new join
result emitted. the older (a0, b2) join result is no longer correct.
```
We don't emit anything for the last record, while the older join result
becomes invalid (that's ok); but if we call `get(key,1)` later, would we get
back the older (not updated) `(a2,b0)` (which would be correct to return from a
consistency point of view because the value-getter should return the same thing
as if the result state-store would have been materialized, and the value-getter
is not in use), or would we get back a now updated result `(a1,b2)`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]