vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1158941945
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
@Override
public ValueAndTimestamp<VOut> get(final K key) {
- final ValueAndTimestamp<V1> valueAndTimestamp1 =
valueGetter1.get(key);
+ return computeJoin(key, valueGetter1::get, valueGetter2::get);
+ }
+
+ @Override
+ public ValueAndTimestamp<VOut> get(final K key, final long
asOfTimestamp) {
Review Comment:
> the value-getter should return the same thing as if the result state-store
would have been materialized
Why is this true? I agree that this is not happening for this particular
value getter, but I don't understand why this is the property that we wish to
preserve.
Wherever the KTable that is a result of a processor computation is
materialized, then the value getter for that table uses the materialization
directly. It's only when the result is not materialized where the value getter
may need to compute results on the fly. When a join value getter needs to
perform computation on the fly, this value getter can actually return correct
results for a timestamped get (as long as both inputs are versioned), by
performing timestamped lookups from both inputs and joining the result.
What you point out is that this join result may have never been emitted
downstream, and that is true, but the two feel separate to me. In an "ideal"
world I think we would emit all older join results downstream when joining
versioned tables. (It's just not practical from a computation standpoint, and
it's also not clear that users want this by default.) I guess I don't see why
the fact that we did not emit the result downstream means we should not return
it if asked to compute it on-the-fly. It seems like if we have the correct
value available, then we should return it.
It does lead to an interesting/confusing situation where if the result of a
join between two versioned tables is materialized with a versioned store, then
timestamped gets will not necessarily return correct results, even though
correct results could be obtained by not materializing the join result and
simply using the value getter instead. The alternative is to say that this
value getter does not support timestamped gets, even though it does have a way
to compute them (correctly), which feels odd. Curious to hear your thoughts on
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]