vcrfxia commented on code in PR #13496:
URL: https://github.com/apache/kafka/pull/13496#discussion_r1162213072


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -153,11 +154,37 @@ public void init(final ProcessorContext<?, ?> context) {
 
         @Override
         public ValueAndTimestamp<VOut> get(final K key) {
-            final ValueAndTimestamp<V1> valueAndTimestamp1 = 
valueGetter1.get(key);
+            return computeJoin(key, valueGetter1::get, valueGetter2::get);
+        }
+
+        @Override
+        public ValueAndTimestamp<VOut> get(final K key, final long 
asOfTimestamp) {

Review Comment:
   > You say "older join result" -- don't think they would get a join result, 
would they?
   
   They get what should've been a join result, if the join were to emit a 
complete history of older join results (which it does not due to computational 
expediency). Here's a concrete example to check we're on the same page. Suppose 
we have an inner join, and all records are for the same key:
   ```
   A: (a5, ts=5)
   B: (b1, ts=1) --> triggers join result (a5, b1) with ts=5
   A: (a2, ts=2) --> no new join result, because this record is out-of-order
   ```
   If the result is not materialized and someone calls `get(k, 2)` on the value 
getter, then the value getter will join `a2` and `b1` on the fly and return 
`(a2, b1)` even though this was never emitted downstream. 
   
   I gave this some more thought and I think the behavior could be desirable, 
even though I agree with your statement:
   > given how the "join processor" works, it basically get a versioned input 
ktable, and produced a non-versioned ktable and drops out-of-order records. So 
if we would only expose a value-getter that does not support get(k, ts) it 
would be reasonable to me.
   
   I think there actually is a situation in which `get(k, ts)` would be called 
on this join value getter today. If the table-table join result is not 
materialized, and is directly joined to a stream, then if the table-table join 
result is identified as "versioned" then the stream-table join will call 
`get(k, ts)` on the value getter. This situation is really interesting because 
it would be wrong for the user to explicitly materialize the result of the 
table-table join with a versioned store, and then join it to the stream, but if 
they do not explicitly materialize the result and instead perform the join 
directly, then they can get proper stream-table join semantics using the value 
getter.
   
   Assuming my understanding is correct, then we have two options:
   1. Say that the result of a table-table join (where both input tables are 
versioned) where the result is not explicitly materialized is versioned, and 
have the value getter support versioning, as in the current PR. Then the 
stream-(table-table) join uses fully versioned semantics and returns correct 
results.
   2. Say the the result of a table-table join (where both input tables are 
versioned) where the result is not explicitly materialized is not versioned, 
and update the value getter to reflect this. Then the stream-(table-table) join 
does not use versioned semantics. Users need to perform a (stream-table)-table 
join to get versioned semantics instead.
   
   The first option is nice in that now `stream-(table-table)` and 
`(stream-table)-table` joins with no intermediate materialization produce the 
same results, but it's also confusing because `stream-(table-table)` produces 
different results if the user materializes the result of the table-table join 
as a versioned store (which is wrong). 
   
   WDYT? I'm happy with either approach now that I feel like we've discussed 
all angles fully. We do need to make a decision in this PR though, if my 
understanding about `get(k, ts)` being called from downstream stream-table 
joins if it's supported is correct else there will be compatibility 
implications for changing it in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to