[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177112234


##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.

Review Comment:
   Sure. Updated here for now, will open a follow-up for older branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177112439


##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.
+This behavior is the same regardless of whether versioned stores 
are used.

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177113372


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -3609,6 +3631,52 @@ KTable-KTable 
Foreign-Key
   and https://cwiki.apache.org/confluence/x/sQU0BQ"; 
title="KIP-328">KIP-328.


+
+Using timestamp-based semantics for table 
processors
+By default, tables in Kafka Streams use offset-based semantics. 
When multiple records arrive for the same key, the one with the largest record 
offset
+is considered the latest record for the key, and is the record 
that appears in aggregation and join results computed on the table. This is 
true even
+in the event of out-of-order
 data. The record with the
+largest offset is considered to be the latest record for the key, 
even if this record does not have the largest timestamp.
+An alternative to offset-based semantics is timestamp-based 
semantics. With timestamp-based semantics, the record with the largest 
timestamp is
+considered the latest record, even if there is another record with 
a larger offset (and smaller timestamp). If there is no out-of-order data (per 
key),
+then offset-based semantics and timestamp-based semantics are 
equivalent; the difference only appears when there is out-of-order data.
+Starting with Kafka Streams 3.5, Kafka Streams supports 
timestamp-based semantics through the use of
+versioned
 state stores.
+When a table is materialized with a versioned state store, it is a 
versioned table and will result in different processor semantics in the 
presence of
+out-of-order data.
+
+When performing a stream-table join, stream-side records will 
join with the latest-by-timestamp table record which has a timestamp less than 
or equal to
+the stream record's timestamp. This is in contrast to joining 
a stream to an unversioned table, in which case the latest-by-offset table 
record will
+be joined, even if the stream-side record is out-of-order and 
has a lower timestamp.
+Aggregations computed on the table will include the 
latest-by-timestamp record for each key, instead of the latest-by-offset 
record. Out-of-order
+updates (per key) will not trigger a new aggregation result. 
This is true for count
+and reduce operations as well, in addition to
+aggregate operations.
+Table joins will use the latest-by-timestamp record for each 
key, instead of the latest-by-offset record. Out-of-order updates (per key) 
will not
+trigger a new join result. This is true for both primary-key 
table-table joins and also foreign-key table-table joins. If a
+versioned table is joined with an unversioned table, the 
result will be the join of the latest-by-timestamp record from the versioned 
table with
+the latest-by-offset record from the unversioned table.
+Table filter operations will no longer suppress consecutive 
tombstones, so users may observe more null
+records downstream of the filter than compared to when 
filtering an unversioned table. This is done in order to preserve a complete 
version history downstream,
+in the event of out-of-order data.
+suppress operations are not allowed on versioned 
tables, as this would collapse the version history
+and lead to undefined behavior.
+
+Once a table is materialized with a versioned store, downstream 
tables are also considered versioned until any of the following occurs:
+
+A downstream table is explicitly materialized, either with an 
unversioned store supplier or with no store supplier (all stores are 
unversioned by default, including the default store supplier)
+Any stateful transformation occurs, including aggregations and 
joins
+A table is converted to a stream and back.
+
+The results of certain processors should not be materialized with 
versioned stores, as these processors do not produce a complete older version 
history,
+and therefore materialization as a versioned table would lead to 
unpredictable results:
+
+Aggregate processors, for both table and stream aggregations. 
This includes aggregate,
+count and reduce operations.
+Table-table join processors, including both primary-key and 
foreign-key joins.
+
+For more on versioned stores and how to start using them in your 
application, see here.
+

Review Comment:
   The section that this line links to (processor API topic on versioned 
stores) mentions that global tables are not allowed to be materialized with 
versioned stores. I think since most of the nuances

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177112691


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -2830,6 +2846,9 @@ KTable-KTable 
Foreign-Key
 
 
 
+When the table is versioned,
+the table record to join with is 
determined by performing a timestamped lookup, i.e., the table record which is 
joined will be the latest-by-timestamp record with timestamp
+less than or equal to the stream 
record timestamp. If the stream record timestamp is older than the table's 
history retention, then the record is dropped.

Review Comment:
   Only the stream-table join performs timestamped-lookups; table-table joins 
drop out-of-order records but only ever call `get(key)` and not `get(key, 
asOfTimestamp)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13622: KAFKA-14834: [13/N] Docs updates for versioned store semantics

2023-04-25 Thread via GitHub


vcrfxia commented on code in PR #13622:
URL: https://github.com/apache/kafka/pull/13622#discussion_r1177130156


##
docs/streams/core-concepts.html:
##
@@ -328,13 +328,17 @@ <
 for stateful operations such as aggregations and joins, however, 
out-of-order data could cause the processing logic to be incorrect. If users 
want to handle such out-of-order data, generally they need to allow their 
applications
 to wait for longer time while bookkeeping their states during the wait 
time, i.e. making trade-off decisions between latency, cost, and correctness.
 In Kafka Streams specifically, users can configure their window 
operators for windowed aggregations to achieve such trade-offs (details can be 
found in Developer 
Guide).
-As for Joins, users have to be aware that some of the out-of-order 
data cannot be handled by increasing on latency and cost in Streams yet:
+As for Joins, users may use versioned
 state stores to address concerns with out-of-order data, but out-of-order 
data will not be handled by default:
 
 
 
- For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulted stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins. 
- For Stream-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order), and hence it may produce unpredictable 
results. 
- For Table-Table joins, out-of-order records are not handled 
(i.e., Streams applications don't check for out-of-order records and just 
process all records in offset order). However, the join result is a changelog 
stream and hence will be eventually consistent. 
+ For Stream-Stream joins, all three types (inner, outer, left) 
handle out-of-order records correctly, but the resulting stream may contain 
unnecessary leftRecord-null for left joins, and leftRecord-null or 
null-rightRecord for outer joins.

Review Comment:
   Here's the PR targeted at 3.4: https://github.com/apache/kafka/pull/13642
   
   Based on the ticket (https://issues.apache.org/jira/browse/KAFKA-10847), it 
looks like you'll want to backport this all the way to 3.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org