vcrfxia commented on code in PR #13496: URL: https://github.com/apache/kafka/pull/13496#discussion_r1162213072
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java: ########## @@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) { @Override public ValueAndTimestamp<VOut> get(final K key) { - final ValueAndTimestamp<V1> valueAndTimestamp1 = valueGetter1.get(key); + return computeJoin(key, valueGetter1::get, valueGetter2::get); + } + + @Override + public ValueAndTimestamp<VOut> get(final K key, final long asOfTimestamp) { Review Comment: > You say "older join result" -- don't think they would get a join result, would they? They get what should've been a join result, if the join were to emit a complete history of older join results (which it does not due to computational expediency). Here's a concrete example to check we're on the same page. Suppose we have an inner join, and all records are for the same key: ``` A: (a5, ts=5) B: (b1, ts=1) --> triggers join result (a5, b1) with ts=5 A: (a2, ts=2) --> no new join result, because this record is out-of-order ``` If the result is not materialized and someone calls `get(k, 2)` on the value getter, then the value getter will join `a2` and `b1` on the fly and return `(a2, b1)` even though this was never emitted downstream. I gave this some more thought and I think the behavior could be desirable, even though I agree with your statement: > given how the "join processor" works, it basically get a versioned input ktable, and produced a non-versioned ktable and drops out-of-order records. So if we would only expose a value-getter that does not support get(k, ts) it would be reasonable to me. I think there actually is a situation in which `get(k, ts)` would be called on this join value getter today. If the table-table join result is not materialized, and is directly joined to a stream, then if the table-table join result is identified as "versioned" then the stream-table join will call `get(k, ts)` on the value getter. This situation is really interesting because it would be wrong for the user to explicitly materialize the result of the table-table join with a versioned store, and then join it to the stream, but if they do not explicitly materialize the result and instead perform the join directly, then they can get proper stream-table join semantics using the value getter. Assuming my understanding is correct, then we have two options: 1. Say that the result of a table-table join (where both input tables are versioned) where the result is not explicitly materialized is versioned, and have the value getter support versioning, as in the current PR. Then the stream-(table-table) join uses fully versioned semantics and returns correct results. 2. Say the the result of a table-table join (where both input tables are versioned) where the result is not explicitly materialized is not versioned, and update the value getter to reflect this. Then the stream-(table-table) join does not use versioned semantics. Users need to perform a (stream-table)-table join to get versioned semantics instead. The first option is nice in that now `stream-(table-table)` and `(stream-table)-table` joins with no intermediate materialization produce the same results, but it's also confusing because `stream-(table-table)` produces different results if the user materializes the result of the table-table join as a versioned store (which is wrong). WDYT? I'm happy with either approach now that I feel like we've discussed all angles fully. We do need to make a decision in this PR though, if my understanding about `get(k, ts)` being called from downstream stream-table joins if it's supported is correct else there will be compatibility implications for changing it in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org