(kafka) branch trunk updated (a0b716ec9fd -> 306b0e862e1)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from a0b716ec9fd KAFKA-16001: Migrated ConsumerNetworkThreadTest away from ConsumerTestBuilder (#16140) add 306b0e862e1 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) No new revisions were added by this update. Summary of changes: docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+)
(kafka) branch 3.8 updated: MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 6330447dada MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) 6330447dada is described below commit 6330447dada765723dbbe6a3565a6487ef67d265 Author: Matthias J. Sax AuthorDate: Thu Jun 13 14:57:47 2024 -0700 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) Reviewers: Bruno Cadonna , Jim Galasyn --- docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 0f819cb384f..13cebb727f0 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -303,6 +303,61 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.2.0 + +RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible +like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. +However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. +For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + +Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. +However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB +store to in-memory store for all operators, especially for larger topologies. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 +adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. +Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + +For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different +AZ than the corresponding active StreamTask. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 +enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs +rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 +adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose +record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + +Interactive Queries allow users to +tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the +actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling +and allow for building more advanced IQ features, +https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces +a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, +as well as Query and QueryResult interfaces (plus additional helper classes). + +In addition, multiple built-in query types were added: KeyQuery for key lookups and +RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) +for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery +(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806) +for key and range lookup into windowed stores. + + + +The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning +of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly +via "delete record" requests, when commiting input topic offsets. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams";>KIP-811 +adds a
(kafka) branch 3.7 updated: MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new f9001b5a464 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) f9001b5a464 is described below commit f9001b5a464ebfc0e124c05e0cc7beda43e320b6 Author: Matthias J. Sax AuthorDate: Thu Jun 13 14:57:47 2024 -0700 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) Reviewers: Bruno Cadonna , Jim Galasyn --- docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 14544077403..6088524464a 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -295,6 +295,61 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.2.0 + +RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible +like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. +However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. +For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + +Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. +However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB +store to in-memory store for all operators, especially for larger topologies. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 +adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. +Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + +For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different +AZ than the corresponding active StreamTask. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 +enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs +rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 +adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose +record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + +Interactive Queries allow users to +tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the +actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling +and allow for building more advanced IQ features, +https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces +a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, +as well as Query and QueryResult interfaces (plus additional helper classes). + +In addition, multiple built-in query types were added: KeyQuery for key lookups and +RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) +for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery +(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806) +for key and range lookup into windowed stores. + + + +The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning +of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly +via "delete record" requests, when commiting input topic offsets. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams";>KIP-811 +adds a
(kafka) branch 3.6 updated: MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new a4e5b073a9a MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) a4e5b073a9a is described below commit a4e5b073a9afbe40af52530f73ee984f70ee4112 Author: Matthias J. Sax AuthorDate: Thu Jun 13 14:57:47 2024 -0700 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) Reviewers: Bruno Cadonna , Jim Galasyn --- docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 6f40747d233..b4f3f3804af 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -203,6 +203,61 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.2.0 + +RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible +like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. +However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. +For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + +Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. +However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB +store to in-memory store for all operators, especially for larger topologies. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 +adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. +Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + +For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different +AZ than the corresponding active StreamTask. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 +enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs +rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 +adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose +record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + +Interactive Queries allow users to +tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the +actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling +and allow for building more advanced IQ features, +https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces +a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, +as well as Query and QueryResult interfaces (plus additional helper classes). + +In addition, multiple built-in query types were added: KeyQuery for key lookups and +RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) +for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery +(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806) +for key and range lookup into windowed stores. + + + +The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning +of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly +via "delete record" requests, when commiting input topic offsets. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams";>KIP-811 +adds a
(kafka) branch 3.5 updated: MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new 81e4b682733 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) 81e4b682733 is described below commit 81e4b682733dd0257367e733762b87b6f921061c Author: Matthias J. Sax AuthorDate: Thu Jun 13 14:57:47 2024 -0700 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) Reviewers: Bruno Cadonna , Jim Galasyn --- docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index c9c689e36c1..9ebd9432465 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -178,6 +178,61 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.2.0 + +RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible +like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. +However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. +For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + +Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. +However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB +store to in-memory store for all operators, especially for larger topologies. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 +adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. +Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + +For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different +AZ than the corresponding active StreamTask. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 +enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs +rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 +adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose +record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + +Interactive Queries allow users to +tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the +actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling +and allow for building more advanced IQ features, +https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces +a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, +as well as Query and QueryResult interfaces (plus additional helper classes). + +In addition, multiple built-in query types were added: KeyQuery for key lookups and +RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) +for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery +(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806) +for key and range lookup into windowed stores. + + + +The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning +of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly +via "delete record" requests, when commiting input topic offsets. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+config+repartition.purge.interval.ms+to+Kafka+Streams";>KIP-811 +adds a
(kafka) branch 3.4 updated: MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new 6e769f861ef MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) 6e769f861ef is described below commit 6e769f861ef601245f01aa4d1b583076140ddb0c Author: Matthias J. Sax AuthorDate: Thu Jun 13 14:57:47 2024 -0700 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) Reviewers: Bruno Cadonna , Jim Galasyn --- docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 39be48975e4..f36f1bad078 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -117,6 +117,61 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.2.0 + +RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible +like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. +However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. +For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + +Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. +However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB +store to in-memory store for all operators, especially for larger topologies. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 +adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. +Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + +For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different +AZ than the corresponding active StreamTask. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 +enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs +rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 +adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose +record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + +Interactive Queries allow users to +tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the +actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling +and allow for building more advanced IQ features, +https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces +a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, +as well as Query and QueryResult interfaces (plus additional helper classes). + +In addition, multiple built-in query types were added: KeyQuery for key lookups and +RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) +for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery +(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806) +for key and range lookup into windowed stores. + + + +The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning +of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly +via "delete record" requests, when commiting input topic offsets. +https://cwiki.apache.org
(kafka) branch 3.3 updated: MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 24d7e38b545 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) 24d7e38b545 is described below commit 24d7e38b5453ab81f62568c756d6de95a70dbc10 Author: Matthias J. Sax AuthorDate: Thu Jun 13 14:57:47 2024 -0700 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) Reviewers: Bruno Cadonna , Jim Galasyn --- docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index bba7068bef7..8bec3a5504b 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -105,6 +105,61 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.2.0 + +RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible +like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. +However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. +For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + +Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. +However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB +store to in-memory store for all operators, especially for larger topologies. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 +adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. +Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + +For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different +AZ than the corresponding active StreamTask. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 +enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs +rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 +adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose +record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + +Interactive Queries allow users to +tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the +actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling +and allow for building more advanced IQ features, +https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces +a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, +as well as Query and QueryResult interfaces (plus additional helper classes). + +In addition, multiple built-in query types were added: KeyQuery for key lookups and +RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) +for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery +(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806) +for key and range lookup into windowed stores. + + + +The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning +of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly +via "delete record" requests, when commiting input topic offsets. +https://cwiki.apache.org
(kafka) branch 3.2 updated: MINOR: update Kafka Streams docs with 3.2 KIP information (#16313)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.2 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.2 by this push: new e4ca0666802 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) e4ca0666802 is described below commit e4ca066680296ea29d443efb626baecc837083f6 Author: Matthias J. Sax AuthorDate: Thu Jun 13 14:57:47 2024 -0700 MINOR: update Kafka Streams docs with 3.2 KIP information (#16313) Reviewers: Bruno Cadonna , Jim Galasyn --- docs/streams/upgrade-guide.html | 55 + 1 file changed, 55 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 06aee3c6995..78c1a584421 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -105,6 +105,61 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.2.0 + +RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible +like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. +However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. +For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + +Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. +However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB +store to in-memory store for all operators, especially for larger topologies. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 +adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. +Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + +For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different +AZ than the corresponding active StreamTask. +https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 +enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs +rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + +https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 +adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose +record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + +Interactive Queries allow users to +tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the +actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling +and allow for building more advanced IQ features, +https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces +a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, +as well as Query and QueryResult interfaces (plus additional helper classes). + +In addition, multiple built-in query types were added: KeyQuery for key lookups and +RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) +for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery +(via https://cwiki.apache.org/confluence/display/KAFKA/KIP-806%3A+Add+session+and+window+query+over+kv-store+in+IQv2";>KIP-806) +for key and range lookup into windowed stores. + + + +The Kafka Streams DSL may insert so-called repartition topics for certain DSL operators to ensure correct partitioning +of data. These topics are configured with infinite retention time, and Kafka Streams purges old data explicitly +via "delete record" requests, when commiting input topic offsets. +https://cwiki.apache.org
(kafka) branch trunk updated: MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 92614699e6e MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) 92614699e6e is described below commit 92614699e6e64a631bf1f4ea35f4ff28bf17e4c9 Author: Matthias J. Sax AuthorDate: Thu Jun 13 15:17:00 2024 -0700 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) Reviewers: Lucas Brutschy , Jim Galasyn --- docs/ops.html | 27 +--- docs/streams/upgrade-guide.html | 56 + 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 5df69b09f04..83f0e35ba20 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3100,10 +3100,11 @@ active-process-ratio metrics which have a recording level of info: Processor Node Metrics The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for - source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, and the - record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor + source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, + emit-final-* metrics are only available for windowed aggregations nodes, and the + record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). - All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have + All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: @@ -3142,6 +3143,26 @@ active-process-ratio metrics which have a recording level of info: The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + +emit-final-latency-max +The max latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-latency-avg +The avg latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-rate +The rate of records emitted when records could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-total +The total number of records emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 13cebb727f0..1f0647931bd 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -303,6 +303,62 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload, which allows forcing an instance to leave the + group immediately. + + Note: Due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DSL. The existing methods KStream.transform, + KStream.flatTransform, KStream.transformValues, and KStream.flatTransformValu
(kafka) branch 3.8 updated: MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 3e9fe3a6798 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) 3e9fe3a6798 is described below commit 3e9fe3a67981910864d8f37b7b569d3f4e56a333 Author: Matthias J. Sax AuthorDate: Thu Jun 13 15:17:00 2024 -0700 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) Reviewers: Lucas Brutschy , Jim Galasyn --- docs/ops.html | 27 +--- docs/streams/upgrade-guide.html | 56 + 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 036841fa0d3..bce8ad685d9 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3099,10 +3099,11 @@ active-process-ratio metrics which have a recording level of info: Processor Node Metrics The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for - source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, and the - record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor + source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, + emit-final-* metrics are only available for windowed aggregations nodes, and the + record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). - All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have + All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: @@ -3141,6 +3142,26 @@ active-process-ratio metrics which have a recording level of info: The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + +emit-final-latency-max +The max latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-latency-avg +The avg latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-rate +The rate of records emitted when records could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-total +The total number of records emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 13cebb727f0..1f0647931bd 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -303,6 +303,62 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload, which allows forcing an instance to leave the + group immediately. + + Note: Due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DSL. The existing methods KStream.transform, + KStream.flatTransform, KStream.transformValues, and KStream.flatTransformValues
(kafka) branch 3.6 updated: MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 795c252a3fb MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) 795c252a3fb is described below commit 795c252a3fb6870d87a254227e59e335dc23eda1 Author: Matthias J. Sax AuthorDate: Thu Jun 13 15:17:00 2024 -0700 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) Reviewers: Lucas Brutschy , Jim Galasyn --- docs/ops.html | 27 +--- docs/streams/upgrade-guide.html | 56 + 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index b6a30aa24ba..6b1fe02a57e 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3016,10 +3016,11 @@ active-process-ratio metrics which have a recording level of info: Processor Node Metrics The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for - source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, and the - record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor + source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, + emit-final-* metrics are only available for windowed aggregations nodes, and the + record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). - All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have + All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: @@ -3058,6 +3059,26 @@ active-process-ratio metrics which have a recording level of info: The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + +emit-final-latency-max +The max latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-latency-avg +The avg latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-rate +The rate of records emitted when records could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-total +The total number of records emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b4f3f3804af..90eb3b91b14 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -203,6 +203,62 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload, which allows forcing an instance to leave the + group immediately. + + Note: Due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DSL. The existing methods KStream.transform, + KStream.flatTransform, KStream.transformValues, and KStream.flatTransformValues
(kafka) branch 3.7 updated: MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 65cbd478e8b MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) 65cbd478e8b is described below commit 65cbd478e8b8014cbb35db8f0acdbe659091bd65 Author: Matthias J. Sax AuthorDate: Thu Jun 13 15:17:00 2024 -0700 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) Reviewers: Lucas Brutschy , Jim Galasyn --- docs/ops.html | 27 +--- docs/streams/upgrade-guide.html | 56 + 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 603f319f03e..4bbeeaf375b 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3080,10 +3080,11 @@ active-process-ratio metrics which have a recording level of info: Processor Node Metrics The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for - source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, and the - record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor + source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, + emit-final-* metrics are only available for windowed aggregations nodes, and the + record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). - All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have + All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: @@ -3122,6 +3123,26 @@ active-process-ratio metrics which have a recording level of info: The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + +emit-final-latency-max +The max latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-latency-avg +The avg latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-rate +The rate of records emitted when records could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-total +The total number of records emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 6088524464a..adfb836a50c 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -295,6 +295,62 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload, which allows forcing an instance to leave the + group immediately. + + Note: Due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DSL. The existing methods KStream.transform, + KStream.flatTransform, KStream.transformValues, and KStream.flatTransformValues
(kafka) branch 3.5 updated: MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new fe0cbc3d63d MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) fe0cbc3d63d is described below commit fe0cbc3d63dd957b16ae1d8b67457775ebae2adf Author: Matthias J. Sax AuthorDate: Thu Jun 13 15:17:00 2024 -0700 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) Reviewers: Lucas Brutschy , Jim Galasyn --- docs/ops.html | 27 +--- docs/streams/upgrade-guide.html | 56 + 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 86b5e548aa7..5a551a6b804 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -2872,10 +2872,11 @@ active-process-ratio metrics which have a recording level of info: Processor Node Metrics The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for - source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, and the - record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor + source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, + emit-final-* metrics are only available for windowed aggregations nodes, and the + record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). - All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have + All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: @@ -2914,6 +2915,26 @@ active-process-ratio metrics which have a recording level of info: The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + +emit-final-latency-max +The max latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-latency-avg +The avg latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-rate +The rate of records emitted when records could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-total +The total number of records emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 9ebd9432465..a00d7428cf5 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -178,6 +178,62 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload, which allows forcing an instance to leave the + group immediately. + + Note: Due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DSL. The existing methods KStream.transform, + KStream.flatTransform, KStream.transformValues, and KStream.flatTransformValues
(kafka) branch 3.4 updated: MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new f2e78fc7b85 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) f2e78fc7b85 is described below commit f2e78fc7b85a32aec848290d8a4884ed91a00b76 Author: Matthias J. Sax AuthorDate: Thu Jun 13 15:17:00 2024 -0700 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) Reviewers: Lucas Brutschy , Jim Galasyn --- docs/ops.html | 27 +--- docs/streams/upgrade-guide.html | 56 + 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 50c76011cbb..20597e5963c 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -2863,10 +2863,11 @@ active-process-ratio metrics which have a recording level of info: Processor Node Metrics The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for - source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, and the - record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor + source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, + emit-final-* metrics are only available for windowed aggregations nodes, and the + record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). - All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have + All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: @@ -2905,6 +2906,26 @@ active-process-ratio metrics which have a recording level of info: The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + +emit-final-latency-max +The max latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-latency-avg +The avg latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-rate +The rate of records emitted when records could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-total +The total number of records emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index f36f1bad078..0f5fff21258 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -117,6 +117,62 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload, which allows forcing an instance to leave the + group immediately. + + Note: Due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DS
(kafka) branch 3.3 updated: MINOR: update Kafka Streams docs with 3.3 KIP information (#16316)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.3 by this push: new 05694da5d4e MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) 05694da5d4e is described below commit 05694da5d4e64b1af49f8eccd25ce72d9b436dc4 Author: Matthias J. Sax AuthorDate: Thu Jun 13 15:17:00 2024 -0700 MINOR: update Kafka Streams docs with 3.3 KIP information (#16316) Reviewers: Lucas Brutschy , Jim Galasyn --- docs/ops.html | 27 +--- docs/streams/upgrade-guide.html | 56 + 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 50c76011cbb..20597e5963c 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -2863,10 +2863,11 @@ active-process-ratio metrics which have a recording level of info: Processor Node Metrics The following metrics are only available on certain types of nodes, i.e., the process-* metrics are only available for - source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, and the - record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor + source processor nodes, the suppression-emit-* metrics are only available for suppression operation nodes, + emit-final-* metrics are only available for windowed aggregations nodes, and the + record-e2e-latency-* metrics are only available for source processor nodes and terminal nodes (nodes without successor nodes). - All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have + All of the metrics have a recording level of debug, except for the record-e2e-latency-* metrics which have a recording level of info: @@ -2905,6 +2906,26 @@ active-process-ratio metrics which have a recording level of info: The total number of records that have been emitted downstream from suppression operation nodes. kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + +emit-final-latency-max +The max latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-latency-avg +The avg latency to emit final records when a record could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-rate +The rate of records emitted when records could be emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + + +emit-final-records-total +The total number of records emitted. + kafka.streams:type=stream-processor-node-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) + record-e2e-latency-avg The average end-to-end latency of a record, measured by comparing the record timestamp with the system time when it has been fully processed by the node. diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 8bec3a5504b..d46ba4ba3d3 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -105,6 +105,62 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.3.0 + + Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies + that a rebalance is delayed until max.poll.interval.ms passed. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-812%3A+Introduce+another+form+of+the+%60KafkaStreams.close%28%29%60+API+that+forces+the+member+to+leave+the+consumer+group";>KIP-812 + introduces KafkaStreams.close(CloseOptions) overload, which allows forcing an instance to leave the + group immediately. + + Note: Due to internal limitations, CloseOptions only works for static consumer groups at this point + (cf. https://issues.apache.org/jira/browse/KAFKA-16514";>KAFKA-16514 for more details and a fix in + some future release). + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-820%3A+Extend+KStream+process+with+new+Processor+API";>KIP-820 + adapts the PAPI type-safety improvement of KIP-478 into the DS
(kafka) branch trunk updated (133f2b0f311 -> dfe0fcfe28a)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 133f2b0f311 KAFKA-16879 SystemTime should use singleton mode (#16266) add dfe0fcfe28a MINOR: Change KStreamKstreamOuterJoinTest to use distinct left and right types (#15513) No new revisions were added by this update. Summary of changes: .../internals/KStreamKStreamOuterJoinTest.java | 546 +++-- 1 file changed, 274 insertions(+), 272 deletions(-)
(kafka) branch trunk updated: KAFKA-16955: fix synchronization of streams threadState (#16337)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 9a239c6142a KAFKA-16955: fix synchronization of streams threadState (#16337) 9a239c6142a is described below commit 9a239c6142a8f2eb36f1600d1012224c31e58e71 Author: Rohan AuthorDate: Fri Jun 14 10:44:36 2024 -0700 KAFKA-16955: fix synchronization of streams threadState (#16337) Each KafkaStreams instance maintains a map from threadId to state to use to aggregate to a KafkaStreams app state. The map is updated on every state change, and when a new thread is created. State change updates are done in a synchronized blocks, however the update that happens on thread creation is not, which can raise ConcurrentModificationException. This patch moves this update into the listener object and protects it using the object's lock. It also moves ownership of the state map into the listener so that its less likely that future changes access it without locking Reviewers: Matthias J. Sax --- .../org/apache/kafka/streams/KafkaStreams.java | 64 ++ 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 9cf91f163e2..fcc999e92af 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -176,7 +176,6 @@ public class KafkaStreams implements AutoCloseable { private final long totalCacheSize; private final StreamStateListener streamStateListener; private final DelegatingStateRestoreListener delegatingStateRestoreListener; -private final Map threadState; private final UUID processId; private final KafkaClientSupplier clientSupplier; protected final TopologyMetadata topologyMetadata; @@ -633,17 +632,13 @@ public class KafkaStreams implements AutoCloseable { /** * Class that handles stream thread transitions */ -final class StreamStateListener implements StreamThread.StateListener { +private final class StreamStateListener implements StreamThread.StateListener { private final Map threadState; private GlobalStreamThread.State globalThreadState; -// this lock should always be held before the state lock -private final Object threadStatesLock; -StreamStateListener(final Map threadState, -final GlobalStreamThread.State globalThreadState) { -this.threadState = threadState; +StreamStateListener(final GlobalStreamThread.State globalThreadState) { +this.threadState = new HashMap<>(); this.globalThreadState = globalThreadState; -this.threadStatesLock = new Object(); } /** @@ -675,33 +670,35 @@ public class KafkaStreams implements AutoCloseable { public synchronized void onChange(final Thread thread, final ThreadStateTransitionValidator abstractNewState, final ThreadStateTransitionValidator abstractOldState) { -synchronized (threadStatesLock) { -// StreamThreads first -if (thread instanceof StreamThread) { -final StreamThread.State newState = (StreamThread.State) abstractNewState; -threadState.put(thread.getId(), newState); - -if (newState == StreamThread.State.PARTITIONS_REVOKED || newState == StreamThread.State.PARTITIONS_ASSIGNED) { -setState(State.REBALANCING); -} else if (newState == StreamThread.State.RUNNING) { -maybeSetRunning(); -} -} else if (thread instanceof GlobalStreamThread) { -// global stream thread has different invariants -final GlobalStreamThread.State newState = (GlobalStreamThread.State) abstractNewState; -globalThreadState = newState; - -if (newState == GlobalStreamThread.State.RUNNING) { -maybeSetRunning(); -} else if (newState == GlobalStreamThread.State.DEAD) { -if (state != State.PENDING_SHUTDOWN) { -log.error("Global thread has died. The streams application or client will now close to ERROR."); -closeToError(); -} +// StreamThreads first +if (thread instanceof StreamThread) { +final StreamThread.State newState = (StreamThread.Sta
(kafka) branch trunk updated: MINOR: update Kafka Streams docs with 3.4 KIP information (#16336)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f80deeabbcf MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) f80deeabbcf is described below commit f80deeabbcf92dda415e7b8eb10ff3313988e893 Author: Matthias J. Sax AuthorDate: Fri Jun 14 15:01:35 2024 -0700 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) Reviewers: Jim Galasyn , Bill Bejeck --- docs/ops.html | 10 ++ docs/streams/upgrade-guide.html | 29 + 2 files changed, 39 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index 83f0e35ba20..f161cba58b9 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3095,6 +3095,16 @@ active-process-ratio metrics which have a recording level of info: The fraction of time the stream thread spent on processing this task among all assigned active tasks. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + +input-buffer-bytes-total +The total number of bytes accumulated by this task, + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + + +cache-size-bytes-total +The cache size in bytes accumulated by this task. + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 1f0647931bd..127cb99c3b1 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -303,6 +303,35 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.4.0 + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770 deprecates + config cache.max.bytes.buffering in favor of the newly introduced config statestore.cache.max.bytes. + To improve monitoring, two new metrics input-buffer-bytes-total and cache-size-bytes-total + were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config + input.buffer.max.bytes is not available yet. + + + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356";>KIP-873 enables you to multicast + result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending. + The Integer StreamPartitioner.partition() method is deprecated and replaced by the newly added + Optiona≶Set<Integer>>StreamPartitioner.partitions() method, which enables returning a set of partitions to send the record to. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins";>KIP-862 + adds a DSL optimization for stream-stream self-joins. The optimization is enabled via a new option single.store.self.join + which can be set via existing config topology.optimization. If enabled, the DSL will use a different + join processor implementation that uses a single RocksDB store instead of two, to avoid unnecessary data duplication for the self-join case. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset";>KIP-865 + updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating + the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place. + + Streams API changes in 3.3.0 Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies
(kafka) branch 3.8 updated: MINOR: update Kafka Streams docs with 3.4 KIP information (#16336)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 7435dfaa97d MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) 7435dfaa97d is described below commit 7435dfaa97d8a5e3ebd52e98139c14076c2d844a Author: Matthias J. Sax AuthorDate: Fri Jun 14 15:01:35 2024 -0700 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) Reviewers: Jim Galasyn , Bill Bejeck --- docs/ops.html | 10 ++ docs/streams/upgrade-guide.html | 29 + 2 files changed, 39 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index bce8ad685d9..48a0e13e7a8 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3094,6 +3094,16 @@ active-process-ratio metrics which have a recording level of info: The fraction of time the stream thread spent on processing this task among all assigned active tasks. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + +input-buffer-bytes-total +The total number of bytes accumulated by this task, + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + + +cache-size-bytes-total +The cache size in bytes accumulated by this task. + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 1f0647931bd..127cb99c3b1 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -303,6 +303,35 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.4.0 + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770 deprecates + config cache.max.bytes.buffering in favor of the newly introduced config statestore.cache.max.bytes. + To improve monitoring, two new metrics input-buffer-bytes-total and cache-size-bytes-total + were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config + input.buffer.max.bytes is not available yet. + + + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356";>KIP-873 enables you to multicast + result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending. + The Integer StreamPartitioner.partition() method is deprecated and replaced by the newly added + Optiona≶Set<Integer>>StreamPartitioner.partitions() method, which enables returning a set of partitions to send the record to. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins";>KIP-862 + adds a DSL optimization for stream-stream self-joins. The optimization is enabled via a new option single.store.self.join + which can be set via existing config topology.optimization. If enabled, the DSL will use a different + join processor implementation that uses a single RocksDB store instead of two, to avoid unnecessary data duplication for the self-join case. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset";>KIP-865 + updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating + the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place. + + Streams API changes in 3.3.0 Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies
(kafka) branch 3.7 updated: MINOR: update Kafka Streams docs with 3.4 KIP information (#16336)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 8053830a478 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) 8053830a478 is described below commit 8053830a4788226e1b517419452bbdd7c7a8087b Author: Matthias J. Sax AuthorDate: Fri Jun 14 15:01:35 2024 -0700 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) Reviewers: Jim Galasyn , Bill Bejeck --- docs/ops.html | 10 ++ docs/streams/upgrade-guide.html | 29 + 2 files changed, 39 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index 4bbeeaf375b..b1da472d19e 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3075,6 +3075,16 @@ active-process-ratio metrics which have a recording level of info: The fraction of time the stream thread spent on processing this task among all assigned active tasks. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + +input-buffer-bytes-total +The total number of bytes accumulated by this task, + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + + +cache-size-bytes-total +The cache size in bytes accumulated by this task. + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index adfb836a50c..09328389c43 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -295,6 +295,35 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.4.0 + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770 deprecates + config cache.max.bytes.buffering in favor of the newly introduced config statestore.cache.max.bytes. + To improve monitoring, two new metrics input-buffer-bytes-total and cache-size-bytes-total + were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config + input.buffer.max.bytes is not available yet. + + + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356";>KIP-873 enables you to multicast + result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending. + The Integer StreamPartitioner.partition() method is deprecated and replaced by the newly added + Optiona≶Set<Integer>>StreamPartitioner.partitions() method, which enables returning a set of partitions to send the record to. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins";>KIP-862 + adds a DSL optimization for stream-stream self-joins. The optimization is enabled via a new option single.store.self.join + which can be set via existing config topology.optimization. If enabled, the DSL will use a different + join processor implementation that uses a single RocksDB store instead of two, to avoid unnecessary data duplication for the self-join case. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset";>KIP-865 + updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating + the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place. + + Streams API changes in 3.3.0 Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies
(kafka) branch 3.6 updated: MINOR: update Kafka Streams docs with 3.4 KIP information (#16336)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new 35f0eeb2b07 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) 35f0eeb2b07 is described below commit 35f0eeb2b07cabba0287b1a83a124abfb15e6745 Author: Matthias J. Sax AuthorDate: Fri Jun 14 15:01:35 2024 -0700 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) Reviewers: Jim Galasyn , Bill Bejeck --- docs/ops.html | 10 ++ docs/streams/upgrade-guide.html | 29 + 2 files changed, 39 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index 6b1fe02a57e..43dabc34805 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3011,6 +3011,16 @@ active-process-ratio metrics which have a recording level of info: The fraction of time the stream thread spent on processing this task among all assigned active tasks. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + +input-buffer-bytes-total +The total number of bytes accumulated by this task, + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + + +cache-size-bytes-total +The cache size in bytes accumulated by this task. + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 90eb3b91b14..40068d88ca1 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -203,6 +203,35 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.4.0 + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770 deprecates + config cache.max.bytes.buffering in favor of the newly introduced config statestore.cache.max.bytes. + To improve monitoring, two new metrics input-buffer-bytes-total and cache-size-bytes-total + were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config + input.buffer.max.bytes is not available yet. + + + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356";>KIP-873 enables you to multicast + result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending. + The Integer StreamPartitioner.partition() method is deprecated and replaced by the newly added + Optiona≶Set<Integer>>StreamPartitioner.partitions() method, which enables returning a set of partitions to send the record to. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins";>KIP-862 + adds a DSL optimization for stream-stream self-joins. The optimization is enabled via a new option single.store.self.join + which can be set via existing config topology.optimization. If enabled, the DSL will use a different + join processor implementation that uses a single RocksDB store instead of two, to avoid unnecessary data duplication for the self-join case. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset";>KIP-865 + updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating + the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place. + + Streams API changes in 3.3.0 Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies
(kafka) branch 3.5 updated: MINOR: update Kafka Streams docs with 3.4 KIP information (#16336)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new f2e3335300e MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) f2e3335300e is described below commit f2e3335300e18f674ff2a617f93062589d128a2c Author: Matthias J. Sax AuthorDate: Fri Jun 14 15:01:35 2024 -0700 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) Reviewers: Jim Galasyn , Bill Bejeck --- docs/ops.html | 10 ++ docs/streams/upgrade-guide.html | 29 + 2 files changed, 39 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index 5a551a6b804..21764f85fb5 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -2867,6 +2867,16 @@ active-process-ratio metrics which have a recording level of info: The fraction of time the stream thread spent on processing this task among all assigned active tasks. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + +input-buffer-bytes-total +The total number of bytes accumulated by this task, + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + + +cache-size-bytes-total +The cache size in bytes accumulated by this task. + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index a00d7428cf5..18f59c29bf3 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -178,6 +178,35 @@ adds a new config default.client.supplier that allows to use a custom KafkaClientSupplier without any code changes. +Streams API changes in 3.4.0 + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770 deprecates + config cache.max.bytes.buffering in favor of the newly introduced config statestore.cache.max.bytes. + To improve monitoring, two new metrics input-buffer-bytes-total and cache-size-bytes-total + were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config + input.buffer.max.bytes is not available yet. + + + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356";>KIP-873 enables you to multicast + result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending. + The Integer StreamPartitioner.partition() method is deprecated and replaced by the newly added + Optiona≶Set<Integer>>StreamPartitioner.partitions() method, which enables returning a set of partitions to send the record to. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins";>KIP-862 + adds a DSL optimization for stream-stream self-joins. The optimization is enabled via a new option single.store.self.join + which can be set via existing config topology.optimization. If enabled, the DSL will use a different + join processor implementation that uses a single RocksDB store instead of two, to avoid unnecessary data duplication for the self-join case. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset";>KIP-865 + updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating + the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place. + + Streams API changes in 3.3.0 Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies
(kafka) branch 3.4 updated: MINOR: update Kafka Streams docs with 3.4 KIP information (#16336)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.4 by this push: new 65e223a76c4 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) 65e223a76c4 is described below commit 65e223a76c40d712b0f2ecef3911ad927d8ad2df Author: Matthias J. Sax AuthorDate: Fri Jun 14 15:01:35 2024 -0700 MINOR: update Kafka Streams docs with 3.4 KIP information (#16336) Reviewers: Jim Galasyn , Bill Bejeck --- docs/ops.html | 10 ++ docs/streams/upgrade-guide.html | 29 + 2 files changed, 39 insertions(+) diff --git a/docs/ops.html b/docs/ops.html index 20597e5963c..4922f73a997 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -2858,6 +2858,16 @@ active-process-ratio metrics which have a recording level of info: The fraction of time the stream thread spent on processing this task among all assigned active tasks. kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + +input-buffer-bytes-total +The total number of bytes accumulated by this task, + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + + +cache-size-bytes-total +The cache size in bytes accumulated by this task. + kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+) + diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 0f5fff21258..fd6c6f60e2f 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -117,6 +117,35 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.4.0 + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390";>KIP-770 deprecates + config cache.max.bytes.buffering in favor of the newly introduced config statestore.cache.max.bytes. + To improve monitoring, two new metrics input-buffer-bytes-total and cache-size-bytes-total + were added at the DEBUG level. Note, that the KIP is only partially implemented in the 3.4.0 release, and config + input.buffer.max.bytes is not available yet. + + + + https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356";>KIP-873 enables you to multicast + result records to multiple partition of downstream sink topics and adds functionality for choosing to drop result records without sending. + The Integer StreamPartitioner.partition() method is deprecated and replaced by the newly added + Optiona≶Set<Integer>>StreamPartitioner.partitions() method, which enables returning a set of partitions to send the record to. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join+optimization+for+stream-stream+joins";>KIP-862 + adds a DSL optimization for stream-stream self-joins. The optimization is enabled via a new option single.store.self.join + which can be set via existing config topology.optimization. If enabled, the DSL will use a different + join processor implementation that uses a single RocksDB store instead of two, to avoid unnecessary data duplication for the self-join case. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-865%3A+Support+--bootstrap-server+in+kafka-streams-application-reset";>KIP-865 + updates the Kafka Streams application reset tool’s server parameter name to conform to the other Kafka tooling by deprecating + the --bootstrap-servers parameter and introducing a new --bootstrap-server parameter in its place. + + Streams API changes in 3.3.0 Kafka Streams does not send a "leave group" request when an instance is closed. This behavior implies
(kafka-site) branch update-streams-upgrade-guide-32-33-34 created (now 53e122c9)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch update-streams-upgrade-guide-32-33-34 in repository https://gitbox.apache.org/repos/asf/kafka-site.git at 53e122c9 MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information This branch includes the following new commits: new 53e122c9 MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(kafka-site) 01/01: MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch update-streams-upgrade-guide-32-33-34 in repository https://gitbox.apache.org/repos/asf/kafka-site.git commit 53e122c99b48ae4ccfdb465714184c53c4bc85e5 Author: Matthias J. Sax AuthorDate: Fri Jun 14 15:15:55 2024 -0700 MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information Porting three PR from `trunk` directly: - https://github.com/apache/kafka/pull/16313 - https://github.com/apache/kafka/pull/16316 - https://github.com/apache/kafka/pull/16336 --- 32/streams/upgrade-guide.html | 54 + 33/ops.html | 27 - 33/streams/upgrade-guide.html | 105 + 34/ops.html | 39 +++-- 34/streams/upgrade-guide.html | 131 ++ 35/ops.html | 39 +++-- 35/streams/upgrade-guide.html | 131 ++ 36/ops.html | 39 +++-- 36/streams/upgrade-guide.html | 131 ++ 37/ops.html | 39 +++-- 37/streams/upgrade-guide.html | 131 ++ 11 files changed, 847 insertions(+), 19 deletions(-) diff --git a/32/streams/upgrade-guide.html b/32/streams/upgrade-guide.html index a6c1d211..bdc0a275 100644 --- a/32/streams/upgrade-guide.html +++ b/32/streams/upgrade-guide.html @@ -105,6 +105,60 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.2.0 + + RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible + like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. + However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. + For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + + Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. + However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB + store to in-memory store for all operators, especially for larger topologies. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 + adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. + Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + + For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different + AZ than the corresponding active StreamTask. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 + enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs + rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 + adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose + record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + + Interactive Queries allow users to + tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the + actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling + and allow for building more advanced IQ features, + https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces + a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, + as well as Query and QueryResult interfaces (plus additional helper classes). + In addition, multiple built-in query types were added: KeyQuery for key lookups and + RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over+kv-store+in+IQv2";>KIP-805) + for key-range queries on key-value stores, as well as WindowKeyQuery and WindowRangeQuery + (via https://cwiki.a
(kafka-site) branch asf-site updated: MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information (#607)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 356cf148 MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information (#607) 356cf148 is described below commit 356cf1488618c23d5d97c0d23cd908f24a872dab Author: Matthias J. Sax AuthorDate: Mon Jun 17 08:33:42 2024 -0700 MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information (#607) Porting three PR from `trunk: - https://github.com/apache/kafka/pull/16313 - https://github.com/apache/kafka/pull/16316 - https://github.com/apache/kafka/pull/16336 --- 32/streams/upgrade-guide.html | 54 + 33/ops.html | 27 - 33/streams/upgrade-guide.html | 105 + 34/ops.html | 39 +++-- 34/streams/upgrade-guide.html | 131 ++ 35/ops.html | 39 +++-- 35/streams/upgrade-guide.html | 131 ++ 36/ops.html | 39 +++-- 36/streams/upgrade-guide.html | 131 ++ 37/ops.html | 39 +++-- 37/streams/upgrade-guide.html | 131 ++ 11 files changed, 847 insertions(+), 19 deletions(-) diff --git a/32/streams/upgrade-guide.html b/32/streams/upgrade-guide.html index a6c1d211..bdc0a275 100644 --- a/32/streams/upgrade-guide.html +++ b/32/streams/upgrade-guide.html @@ -105,6 +105,60 @@ More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization";>KIP-295. +Streams API changes in 3.2.0 + + RocksDB offers many metrics which are critical to monitor and tune its performance. Kafka Streams started to make RocksDB metrics accessible + like any other Kafka metric via https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams";>KIP-471 in 2.4.0 release. + However, the KIP was only partially implemented, and is now completed with the 3.2.0 release. + For a full list of available RocksDB metrics, please consult the monitoring documentation. + + + + Kafka Streams ships with RocksDB and in-memory store implementations and users can pick which one to use. + However, for the DSL, the choice is a per-operator one, making it cumbersome to switch from the default RocksDB + store to in-memory store for all operators, especially for larger topologies. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+state+store";>KIP-591 + adds a new config default.dsl.store that enables setting the default store for all DSL operators globally. + Note that it is required to pass TopologyConfig to the StreamsBuilder constructor to make use of this new config. + + + + For multi-AZ deployments, it is desired to assign StandbyTasks to a KafkaStreams instance running in a different + AZ than the corresponding active StreamTask. + https://cwiki.apache.org/confluence/display/KAFKA/KIP-708%3A+Rack+aware+StandbyTask+assignment+for+Kafka+Streams";>KIP-708 + enables configuring Kafka Streams instances with a rack-aware StandbyTask assignment strategy, by using the new added configs + rack.aware.assignment.tags and corresponding client.tag.<myTag>. + + + + https://cwiki.apache.org/confluence/display/KAFKA/KIP-791%3A+Add+Record+Metadata+to+State+Store+Context";>KIP-791 + adds a new method Optional<RecordMetadata> StateStoreContext.recordMetadata() to expose + record metadata. This helps for example to provide read-your-writes consistency guarantees in interactive queries. + + + + Interactive Queries allow users to + tap into the operational state of Kafka Streams processor nodes. The existing API is tightly coupled with the + actual state store interfaces and thus the internal implementation of state store. To break up this tight coupling + and allow for building more advanced IQ features, + https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2";>KIP-796 introduces + a completely new IQv2 API, via StateQueryRequest and StateQueryResult classes, + as well as Query and QueryResult interfaces (plus additional helper classes). + In addition, multiple built-in query types were added: KeyQuery for key lookups and + RangeQuery (via https://cwiki.apache.org/confluence/display/KAFKA/KIP-805%3A+Add+range+and+scan+query+over
(kafka-site) branch update-streams-upgrade-guide-32-33-34 deleted (was 53e122c9)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch update-streams-upgrade-guide-32-33-34 in repository https://gitbox.apache.org/repos/asf/kafka-site.git was 53e122c9 MINOR: update Kafka Streams docs with 3.2/3.3/3.4 KIP information The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(kafka-site) branch asf-site updated: Fixed links to Streams Developer Guide subpages (#588)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new a0d1ea60 Fixed links to Streams Developer Guide subpages (#588) a0d1ea60 is described below commit a0d1ea609afb39406a1b480123f007f49533f884 Author: Michael Lück <1602552+th...@users.noreply.github.com> AuthorDate: Mon Jun 17 17:40:04 2024 +0200 Fixed links to Streams Developer Guide subpages (#588) As it seems the Developer Guide page has been changed for Version 1.0.x and sections for "Streams DSL", "Processor API" and "Interactive Queries" got new, separate pages. But some links have not been changed from links targeting anchors to links to the new subpages --- 10/architecture.html | 2 +- 10/core-concepts.html | 6 +++--- 10/streams/architecture.html | 2 +- 10/streams/core-concepts.html | 4 ++-- 10/streams/tutorial.html | 2 +- 10/tutorial.html | 2 +- 11/streams/architecture.html | 2 +- 11/streams/tutorial.html | 2 +- 20/streams/architecture.html | 2 +- 20/streams/tutorial.html | 2 +- 21/streams/architecture.html | 2 +- 21/streams/tutorial.html | 2 +- 22/streams/architecture.html | 2 +- 22/streams/tutorial.html | 2 +- 23/streams/architecture.html | 2 +- 23/streams/tutorial.html | 2 +- 24/streams/architecture.html | 2 +- 24/streams/tutorial.html | 2 +- 25/streams/tutorial.html | 2 +- 26/streams/tutorial.html | 2 +- 27/streams/tutorial.html | 2 +- 28/streams/tutorial.html | 2 +- 30/streams/tutorial.html | 2 +- 31/streams/tutorial.html | 2 +- 32/streams/tutorial.html | 2 +- 33/streams/tutorial.html | 2 +- 34/streams/tutorial.html | 2 +- 35/streams/tutorial.html | 2 +- 36/streams/tutorial.html | 4 ++-- 37/streams/tutorial.html | 2 +- 30 files changed, 34 insertions(+), 34 deletions(-) diff --git a/10/architecture.html b/10/architecture.html index 0b167621..2d1ffaff 100644 --- a/10/architecture.html +++ b/10/architecture.html @@ -98,7 +98,7 @@ Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, -which is an important capability when implementing stateful operations. The Kafka Streams DSL, for example, automatically creates +which is an important capability when implementing stateful operations. The Kafka Streams DSL, for example, automatically creates and manages such state stores when you are calling stateful operators such as join() or aggregate(), or when you are windowing a stream. diff --git a/10/core-concepts.html b/10/core-concepts.html index d0945b28..8f69f9ff 100644 --- a/10/core-concepts.html +++ b/10/core-concepts.html @@ -76,8 +76,8 @@ -Kafka Streams offers two ways to define the stream processing topology: the Kafka Streams DSL provides -the most common data transformation operations such as map, filter, join and aggregations out of the box; the lower-level Processor API allows +Kafka Streams offers two ways to define the stream processing topology: the Kafka Streams DSL provides +the most common data transformation operations such as map, filter, join and aggregations out of the box; the lower-level Processor API allows developers define and connect custom processors as well as to interact with state stores. @@ -131,7 +131,7 @@ Some stream processing applications don't require state, which means the processing of a message is independent from the processing of all other messages. However, being able to maintain state opens up many possibilities for sophisticated stream processing applications: you -can join input streams, or group and aggregate data records. Many such stateful operators are provided by the Kafka Streams DSL. +can join input streams, or group and aggregate data records. Many such stateful operators are provided by the Kafka Streams DSL. Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. diff --git a/10/streams/architecture.html b/10/streams/architecture.html index 69d1330d..33931ea1 100644 --- a/10/streams/architecture.html +++ b/10/streams/architecture.html @@ -103,7 +103,7 @@ Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data, -which is an important capability when implementing stateful operations. The Kafka Streams DSL, for example, automatically creates +which is an important capability when implementing stateful operations. The Kafka
(kafka-site) branch asf-site updated: Howly added to powered-by page (#498)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 337241eb Howly added to powered-by page (#498) 337241eb is described below commit 337241eb3e4e810380a5480709f35f961918be0f Author: Dariia Moisol <54187995+dariiag...@users.noreply.github.com> AuthorDate: Wed Jun 19 03:04:11 2024 +0300 Howly added to powered-by page (#498) Co-authored-by: Dariia Moisol --- images/powered-by/howly.png | Bin 0 -> 23975 bytes powered-by.html | 6 ++ 2 files changed, 6 insertions(+) diff --git a/images/powered-by/howly.png b/images/powered-by/howly.png new file mode 100644 index ..8889c8b4 Binary files /dev/null and b/images/powered-by/howly.png differ diff --git a/powered-by.html b/powered-by.html index d1f00a2e..55d103d8 100644 --- a/powered-by.html +++ b/powered-by.html @@ -2,6 +2,12 @@
(kafka-site) branch asf-site updated: Added Deep.BI - Predictive Analytics & AI Platform (#500)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 48f6c584 Added Deep.BI - Predictive Analytics & AI Platform (#500) 48f6c584 is described below commit 48f6c584a41ffb2743ed5d7084280b1049b45e13 Author: jmalcone546 <129925985+jmalcone...@users.noreply.github.com> AuthorDate: Wed Jun 19 02:05:16 2024 +0200 Added Deep.BI - Predictive Analytics & AI Platform (#500) Hi there, I'd like to request to add Deep.BI (https://www.crunchbase.com/organization/deep-bi) to the powered-by list. I added the link to our live logo - if this should be done differently please let me know! You can find out more about us through our Druid Summit presentation last year to verify our architecture and Kafka usage: https://www.youtube.com/watch?v=oc3Bmi1XHX8. --- powered-by.html | 5 + 1 file changed, 5 insertions(+) diff --git a/powered-by.html b/powered-by.html index 55d103d8..034d3412 100644 --- a/powered-by.html +++ b/powered-by.html @@ -229,6 +229,11 @@ "logo": "deephaven.svg", "logoBgColor": "#040427", "description": "Deephaven is a query engine for streaming workloads. Deephaven enables you to ingest and transform Kafka feeds as live dataframes." +}, { +"link": "http://www.deep.bi/";, +"logo": "https://images.squarespace-cdn.com/content/v1/5c335a239f87708868173efd/1547466246389-VD6SYBGIGV89AW7NMBIX/Logo.png";, +"logoBgColor": "#ff", +"description": "Deep.BI helps enterprises leverage next-gen data & AI pipelines, powered by Apache Druid & Apache Flink. We use Kafka to process hundreds of thousands of real-time events per second." }, { "link": "https://developer.ibm.com/messaging/message-hub/";, "logo": "ibmmessagehub.png",
(kafka-site) branch asf-site updated: Add Skillsoft to Powered By (#601)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 9203d542 Add Skillsoft to Powered By (#601) 9203d542 is described below commit 9203d54213c292f2fa7ebadade338dc73bcb6fbf Author: Brandon Powers AuthorDate: Tue Jun 18 20:16:05 2024 -0400 Add Skillsoft to Powered By (#601) --- images/powered-by/skillsoft.png | Bin 0 -> 11126 bytes powered-by.html | 5 + 2 files changed, 5 insertions(+) diff --git a/images/powered-by/skillsoft.png b/images/powered-by/skillsoft.png new file mode 100644 index ..95645dec Binary files /dev/null and b/images/powered-by/skillsoft.png differ diff --git a/powered-by.html b/powered-by.html index 034d3412..e9161e5f 100644 --- a/powered-by.html +++ b/powered-by.html @@ -579,6 +579,11 @@ "logo": "sentry.png", "logoBgColor": "#ff", "description": "Sentry uses Apache Kafka® as our main platform for streaming data throughout the product, acting as our persistent and highly-available transport between services dedicated to event ingestion, product search, business intelligence, and machine learning." +}, { +"link": "https://www.skillsoft.com/";, +"logo": "skillsoft.png", +"logoBgColor": "#ff", +"description": "At Skillsoft, Apache Kafka is a vital component of our online learning platform, Percipio, empowering features such as activity tracking and content recommendation for learners." }, { "link": "http://www.skyscanner.net/";, "logo": "skyscanner.png",
(kafka) branch trunk updated: KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] (#15495)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e2060204fea KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] (#15495) e2060204fea is described below commit e2060204fead9aa03f611b76c543b6824f8eb26b Author: Jinyong Choi AuthorDate: Wed Jun 19 09:45:19 2024 +0900 KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] (#15495) Reviewers: Matthias J. Sax --- docs/streams/developer-guide/memory-mgmt.html | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/streams/developer-guide/memory-mgmt.html b/docs/streams/developer-guide/memory-mgmt.html index 591b97bb180..661db58f956 100644 --- a/docs/streams/developer-guide/memory-mgmt.html +++ b/docs/streams/developer-guide/memory-mgmt.html @@ -151,6 +151,8 @@ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); Serdes.Long()) .withCachingEnabled(); Record caches are not supported for versioned state stores. + To avoid reading stale data, you can flush() the store before creating the iterator. +Note, that flushing too often can lead to performance degration if RocksDB is used, so we advice to avoid flushing manually in general. RocksDB
(kafka) branch 3.8 updated: KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] (#15495)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new 0225c49f8f8 KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] (#15495) 0225c49f8f8 is described below commit 0225c49f8f898cb19eddf1dc33986fbf47c9f7da Author: Jinyong Choi AuthorDate: Wed Jun 19 09:45:19 2024 +0900 KAFKA-15302: Stale value returned when using store.all() with key deletion [docs] (#15495) Reviewers: Matthias J. Sax --- docs/streams/developer-guide/memory-mgmt.html | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/streams/developer-guide/memory-mgmt.html b/docs/streams/developer-guide/memory-mgmt.html index 7f5bc09cef5..c56d9a395a1 100644 --- a/docs/streams/developer-guide/memory-mgmt.html +++ b/docs/streams/developer-guide/memory-mgmt.html @@ -151,6 +151,8 @@ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); Serdes.Long()) .withCachingEnabled(); Record caches are not supported for versioned state stores. + To avoid reading stale data, you can flush() the store before creating the iterator. +Note, that flushing too often can lead to performance degration if RocksDB is used, so we advice to avoid flushing manually in general. RocksDB
(kafka-site) branch asf-site updated: Add edenlab.svg (#502)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 087438d6 Add edenlab.svg (#502) 087438d6 is described below commit 087438d6183af10bcb10d87b86280cf5043b0594 Author: Andrew Krylov AuthorDate: Fri Jun 21 02:48:07 2024 +0200 Add edenlab.svg (#502) Added Edenlab logo for powered-by page --- images/powered-by/edenlab.svg | 1 + 1 file changed, 1 insertion(+) diff --git a/images/powered-by/edenlab.svg b/images/powered-by/edenlab.svg new file mode 100644 index ..07b94a90 --- /dev/null +++ b/images/powered-by/edenlab.svg @@ -0,0 +1 @@ +http://www.w3.org/2000/svg"; viewBox="0 0 800 267.84">.cls-1{fill:#081123;}logo-big
(kafka-site) branch asf-site updated: Update powered-by.html (#503)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 215f3732 Update powered-by.html (#503) 215f3732 is described below commit 215f3732e1a1d2438933f42c2be57965128bf838 Author: Andrew Krylov AuthorDate: Fri Jun 21 02:48:36 2024 +0200 Update powered-by.html (#503) Added Edenlab case --- powered-by.html | 5 + 1 file changed, 5 insertions(+) diff --git a/powered-by.html b/powered-by.html index e9161e5f..5b7fdd60 100644 --- a/powered-by.html +++ b/powered-by.html @@ -784,6 +784,11 @@ "logo": "nussknacker.svg", "logoBgColor": "#ff", "description": "Nussknacker is a low-code tool that allows IT teams to hand over decision algorithms to non-technical users. Apache Kafka is Nussknacker's primary input and output interface in streaming use cases - Nussknacker reads events from Kafka, applies decision algorithms and outputs actions to Kafka." +}, { + "link": "https://edenlab.io";, +"logo": "edenlab.svg", +"logoBgColor": "#ff", +"description": "Edenlab FHIR engineers use Kafka to manage real-time data flow between dozens of microservices in our product - https://kodjin.com>Kodjin Interoperability Suite - a low-code FHIR-based infrastructure for healthcare data management (contains FHIR Server, Terminology Service, and Mapper). Edenlab is a custom software and product development company focused primarily on healthcare data interoperability. We are also working with some major customers from t [...] }];
(kafka-site) branch asf-site updated: Netstratum-logo-updated (#609)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new b5dd64e9 Netstratum-logo-updated (#609) b5dd64e9 is described below commit b5dd64e9768ff29fe55117a935b8f27989e4418f Author: netstratum-labs AuthorDate: Fri Jun 21 20:47:35 2024 +0100 Netstratum-logo-updated (#609) Co-authored-by: Jineed --- images/powered-by/netstratum-logo.png | Bin 0 -> 48125 bytes powered-by.html | 6 ++ 2 files changed, 6 insertions(+) diff --git a/images/powered-by/netstratum-logo.png b/images/powered-by/netstratum-logo.png new file mode 100644 index ..a818082a Binary files /dev/null and b/images/powered-by/netstratum-logo.png differ diff --git a/powered-by.html b/powered-by.html index 5b7fdd60..bae64ff6 100644 --- a/powered-by.html +++ b/powered-by.html @@ -2,6 +2,12 @@
(kafka-site) branch asf-site updated: Adding OREXES to powered-by section (#591)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new 31253b1b Adding OREXES to powered-by section (#591) 31253b1b is described below commit 31253b1bffcd3d14a407b3b1778b5ed1601a61b1 Author: sifu <39193860+sii...@users.noreply.github.com> AuthorDate: Sat Jun 22 03:56:57 2024 +0200 Adding OREXES to powered-by section (#591) --- images/powered-by/orexes.png | Bin 0 -> 15852 bytes powered-by.html | 6 ++ 2 files changed, 6 insertions(+) diff --git a/images/powered-by/orexes.png b/images/powered-by/orexes.png new file mode 100644 index ..18a4f045 Binary files /dev/null and b/images/powered-by/orexes.png differ diff --git a/powered-by.html b/powered-by.html index bae64ff6..b4539026 100644 --- a/powered-by.html +++ b/powered-by.html @@ -14,6 +14,12 @@ "logoBgColor": "#FF", "description": "At Howly, Kafka is a key component of our event-driven architecture. We use Kafka to facilitate communication between microservices, which enables us to build a scalable and fault-tolerant system." }, +{ +"link": "https://orexes.de/";, +"logo": "orexes.png", +"logoBgColor": "#FF", +"description": "OREXES uses Kafka as the central hub of communication in an innovative identity management solution. They leverage Kafka functionalities to achieve real-time analytics, fault-tolerance and a high degree of reliability." +}, { "link": "http://cloudscaleinc.com/";, "logo": "cloud-scale.png",
(kafka) branch trunk updated: KAFKA-16965: Throw cause of TimeoutException (#16344)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f4cbf71ea68 KAFKA-16965: Throw cause of TimeoutException (#16344) f4cbf71ea68 is described below commit f4cbf71ea68f5bda3afbb2d3a8411658e57516e8 Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com> AuthorDate: Mon Jun 24 23:51:27 2024 +0200 KAFKA-16965: Throw cause of TimeoutException (#16344) Add the cause of TimeoutException for Producer send() errors. Reviewers: Matthias J. Sax , Lianet Magrans , Artem Livshits , Justine Olshan --- .../kafka/clients/producer/KafkaProducer.java | 18 ++--- .../producer/internals/ProducerMetadata.java | 10 + .../kafka/clients/producer/KafkaProducerTest.java | 44 +- 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index b0b68aa098e..e71f1d57553 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; @@ -1177,18 +1178,25 @@ public class KafkaProducer implements Producer { metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs -throw new TimeoutException( -String.format("Topic %s not present in metadata after %d ms.", -topic, maxWaitMs)); +final String errorMessage = String.format("Topic %s not present in metadata after %d ms.", +topic, maxWaitMs); +if (metadata.getError(topic) != null) { +throw new TimeoutException(errorMessage, metadata.getError(topic).exception()); +} +throw new TimeoutException(errorMessage); } cluster = metadata.fetch(); elapsed = time.milliseconds() - nowMs; if (elapsed >= maxWaitMs) { -throw new TimeoutException(partitionsCount == null ? +final String errorMessage = partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", -partition, topic, partitionsCount, maxWaitMs)); +partition, topic, partitionsCount, maxWaitMs); +if (metadata.getError(topic) != null && metadata.getError(topic).exception() instanceof RetriableException) { +throw new TimeoutException(errorMessage, metadata.getError(topic).exception()); +} +throw new TimeoutException(errorMessage); } metadata.maybeThrowExceptionForTopic(topic); remainingWaitMs = maxWaitMs - elapsed; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java index 852657275f4..0c77ef5fc4c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerMetadata.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.utils.LogContext; @@ -42,6 +43,7 @@ public class ProducerMetadata extends Metadata { private final Set newTopics = new HashSet<>(); private final Logger log; private final Time time; +
(kafka) branch trunk updated (3ebad6349de -> dc7c9ad0685)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 3ebad6349de MINOR: Fix missing code tag in doc (#16466) add dc7c9ad0685 MINOR: pass in timeout to Admin.close() (#16422) No new revisions were added by this update. Summary of changes: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 8 +++- 1 file changed, 7 insertions(+), 1 deletion(-)
(kafka) branch trunk updated: KAFKA-16000: Migrated MembershipManagerImplTest away from ConsumerTestBuilder (#16312)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 836f52b0baf KAFKA-16000: Migrated MembershipManagerImplTest away from ConsumerTestBuilder (#16312) 836f52b0baf is described below commit 836f52b0bafd278bbbf064d5ee7bb47e2661f11c Author: brenden20 <118419078+brende...@users.noreply.github.com> AuthorDate: Fri Jun 28 14:13:27 2024 -0500 KAFKA-16000: Migrated MembershipManagerImplTest away from ConsumerTestBuilder (#16312) Finishing migration of MembershipManagerImplTest away from ConsumerTestBuilder and removed all spy objects. Reviewers: Lianet Magrans , Philip Nee , Matthias J. Sax --- .../internals/MembershipManagerImplTest.java | 59 +++--- 1 file changed, 29 insertions(+), 30 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java index 46f22bd96b2..c00488ef40d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java @@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -60,10 +59,12 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks; +import static org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; @@ -82,6 +83,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyCollection; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; @@ -97,14 +99,11 @@ public class MembershipManagerImplTest { private static final String MEMBER_ID = "test-member-1"; private static final int REBALANCE_TIMEOUT = 100; private static final int MEMBER_EPOCH = 1; +private static final LogContext LOG_CONTEXT = new LogContext(); -private final LogContext logContext = new LogContext(); private SubscriptionState subscriptionState; private ConsumerMetadata metadata; - private CommitRequestManager commitRequestManager; - -private ConsumerTestBuilder testBuilder; private BlockingQueue backgroundEventQueue; private BackgroundEventHandler backgroundEventHandler; private Time time; @@ -113,22 +112,16 @@ public class MembershipManagerImplTest { @BeforeEach public void setup() { -testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation()); -metadata = testBuilder.metadata; -subscriptionState = testBuilder.subscriptions; -commitRequestManager = testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new); -backgroundEventQueue = testBuilder.backgroundEventQueue; -backgroundEventHandler = testBuilder.backgroundEventHandler; +metadata = mock(ConsumerMetadata.class); +subscriptionState = mock(SubscriptionState.class); +commitRequestManager = mock(CommitRequestManager.class); +backgroundEventQueue = new LinkedBlockingQueue<>(); +backgroundEventHandler = new BackgroundEventHandler(backgroundEventQueue); time = new MockTime(0); metrics = new Metrics(time); rebalanceMetricsManager = new RebalanceMetricsManager(metrics); -} -@AfterEach -public void tearDown() { -if (testBuilder != null) { -testBuilder.close(); -} + when(commitRequestManager.maybeAutoCommitSyncBeforeRevocation(anyLong())).thenReturn(CompletableFuture.completedFuture(null)); } private MembershipManagerImpl createMembershipManagerJoiningGroup() { @@ -144,7
(kafka) branch trunk updated: KAFKA-16508: Streams custom handler should handle the timeout exceptions (#16450)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 15a4501bded KAFKA-16508: Streams custom handler should handle the timeout exceptions (#16450) 15a4501bded is described below commit 15a4501bded513822485dd85fa6258e16f1571ca Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com> AuthorDate: Sun Jun 30 20:52:36 2024 +0200 KAFKA-16508: Streams custom handler should handle the timeout exceptions (#16450) For a non-existing output topic, Kafka Streams ends up in an infinite retry loop, because the returned TimeoutException extends RetriableException. This PR updates the error handling pass for this case and instead of retrying calls the ProductionExceptionHandler to allow breaking the infinite retry loop. Reviewers: Matthias J. Sax --- .../processor/internals/RecordCollectorImpl.java | 13 +- .../integration/CustomHandlerIntegrationTest.java | 166 + .../processor/internals/RecordCollectorTest.java | 45 ++ 3 files changed, 223 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 597d36e5d7b..16d67666ccb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; @@ -306,7 +307,7 @@ public class RecordCollectorImpl implements RecordCollector { "indicating the task may be migrated out"; sendException.set(new TaskMigratedException(errorMessage, exception)); } else { -if (exception instanceof RetriableException) { +if (isRetriable(exception)) { errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " + "or the connection to broker was interrupted sending the request or receiving the response. " + "\nConsider overwriting `max.block.ms` and /or " + @@ -326,6 +327,16 @@ public class RecordCollectorImpl implements RecordCollector { log.error(errorMessage, exception); } +/** + * The `TimeoutException` with root cause `UnknownTopicOrPartitionException` is considered as non-retriable + * (despite `TimeoutException` being a subclass of `RetriableException`, this particular case is explicitly excluded). +*/ +private boolean isRetriable(final Exception exception) { +return exception instanceof RetriableException && +(!(exception instanceof TimeoutException) || exception.getCause() == null +|| !(exception.getCause() instanceof UnknownTopicOrPartitionException)); +} + private boolean isFatalException(final Exception exception) { final boolean securityException = exception instanceof AuthenticationException || exception instanceof AuthorizationException || diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java new file mode 100644 index 000..3eea2ec7d84 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the spe
(kafka) branch trunk updated: MINOR: Generate javadocs on all source files for streams:test-utils (#16556)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 43676f7612b MINOR: Generate javadocs on all source files for streams:test-utils (#16556) 43676f7612b is described below commit 43676f7612b2155ecada54c61b129d996f58bae2 Author: Vincent Rose AuthorDate: Tue Jul 9 11:32:16 2024 -0700 MINOR: Generate javadocs on all source files for streams:test-utils (#16556) Reviewers: Matthias J. Sax --- build.gradle | 4 1 file changed, 4 deletions(-) diff --git a/build.gradle b/build.gradle index 0199149153a..3c202567b04 100644 --- a/build.gradle +++ b/build.gradle @@ -2518,10 +2518,6 @@ project(':streams:test-utils') { testRuntimeOnly libs.junitPlatformLanucher } - javadoc { -include "**/org/apache/kafka/streams/test/**" - } - tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.runtimeClasspath) { exclude('kafka-streams*')
(kafka) branch 3.8 updated (1dd93857943 -> 1dd16c4f2e1)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git from 1dd93857943 KAFKA-10199: Close pending active tasks to init on partitions lost (#16545) add 1dd16c4f2e1 MINOR: Generate javadocs on all source files for streams:test-utils (#16556) No new revisions were added by this update. Summary of changes: build.gradle | 4 1 file changed, 4 deletions(-)
(kafka) branch trunk updated: MINOR: Add more debug logging to EOSUncleanShutdownIntegrationTest (#16490)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a9b2b36908e MINOR: Add more debug logging to EOSUncleanShutdownIntegrationTest (#16490) a9b2b36908e is described below commit a9b2b36908ed1c21f991ec8945c8fe9e2c42b2e3 Author: Matthias J. Sax AuthorDate: Sun Jul 14 10:28:31 2024 -0700 MINOR: Add more debug logging to EOSUncleanShutdownIntegrationTest (#16490) Reviewers: Bruno Cadonna --- .../kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java | 5 + 1 file changed, 5 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java index 90193c64cb1..ed67049aaa9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java @@ -127,6 +127,9 @@ public class EOSUncleanShutdownIntegrationTest { driver.cleanUp(); driver.start(); +TestUtils.waitForCondition(() -> driver.state().equals(State.RUNNING), +"Expected RUNNING state but driver is on " + driver.state()); + // Task's StateDir final File taskStateDir = new File(String.join("/", TEST_FOLDER.getPath(), appId, "0_0")); final File taskCheckpointFile = new File(taskStateDir, ".checkpoint"); @@ -144,6 +147,8 @@ public class EOSUncleanShutdownIntegrationTest { TestUtils.waitForCondition(() -> recordCount.get() == RECORD_TOTAL, "Expected " + RECORD_TOTAL + " records processed but only got " + recordCount.get()); +} catch (final Exception e) { +e.printStackTrace(); } finally { TestUtils.waitForCondition(() -> driver.state().equals(State.ERROR), "Expected ERROR state but driver is on " + driver.state());
(kafka-site) branch asf-site updated: Update powered-by_adding SPITHA.html (#611)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new b8d1dc30 Update powered-by_adding SPITHA.html (#611) b8d1dc30 is described below commit b8d1dc3097ce87439268ade743da1258e1267524 Author: VictorParkM <122331665+victorpa...@users.noreply.github.com> AuthorDate: Fri Jul 19 08:34:29 2024 +0900 Update powered-by_adding SPITHA.html (#611) --- images/powered-by/spitha.png | Bin 0 -> 3083 bytes powered-by.html | 5 + 2 files changed, 5 insertions(+) diff --git a/images/powered-by/spitha.png b/images/powered-by/spitha.png new file mode 100644 index ..3347616f Binary files /dev/null and b/images/powered-by/spitha.png differ diff --git a/powered-by.html b/powered-by.html index b4539026..2d8a4d07 100644 --- a/powered-by.html +++ b/powered-by.html @@ -801,6 +801,11 @@ "logo": "edenlab.svg", "logoBgColor": "#ff", "description": "Edenlab FHIR engineers use Kafka to manage real-time data flow between dozens of microservices in our product - https://kodjin.com>Kodjin Interoperability Suite - a low-code FHIR-based infrastructure for healthcare data management (contains FHIR Server, Terminology Service, and Mapper). Edenlab is a custom software and product development company focused primarily on healthcare data interoperability. We are also working with some major customers from t [...] +}, { + "link': "https://spitha.io/";, + "logo": "spitha.png", + "logoBgColor": "#ff", + "description": "Based in South Korea, SPITHA is a team of experts specializing in Apache Kafka. We are driven by the question of how to make Kafka easier for users, and with that in mind, we are developing 'Felice,' a robust tool designed to streamline the operation and management of Kafka. Alongside this, we offer in-depth technical consulting and support services to provide comprehensive solutions to our clients." }];
(kafka) branch trunk updated: KAFKA-16558: Implemented HeartbeatRequestState toStringBase() and added a test for it (#16373)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 41beee508e4 KAFKA-16558: Implemented HeartbeatRequestState toStringBase() and added a test for it (#16373) 41beee508e4 is described below commit 41beee508e41b255d0766917894d63e56ceb7220 Author: brenden20 <118419078+brende...@users.noreply.github.com> AuthorDate: Thu Jul 25 15:27:17 2024 -0500 KAFKA-16558: Implemented HeartbeatRequestState toStringBase() and added a test for it (#16373) Reviewers: Kirk True , Matthias J. Sax --- .../internals/HeartbeatRequestManager.java | 7 + .../internals/HeartbeatRequestManagerTest.java | 30 ++ 2 files changed, 37 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java index 8a62051b2f4..6d6e776a1e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java @@ -486,6 +486,13 @@ public class HeartbeatRequestManager implements RequestManager { this.heartbeatTimer.reset(heartbeatIntervalMs); } +@Override +public String toStringBase() { +return super.toStringBase() + +", remainingMs=" + heartbeatTimer.remainingMs() + +", heartbeatIntervalMs=" + heartbeatIntervalMs; +} + /** * Check if a heartbeat request should be sent on the current time. A heartbeat should be * sent if the heartbeat timer has expired, backoff has expired, and there is no request diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 67fb7267f02..cd8311b32e1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -62,6 +62,7 @@ import java.util.Set; import java.util.SortedSet; import static org.apache.kafka.common.utils.Utils.mkSortedSet; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -172,6 +173,35 @@ public class HeartbeatRequestManagerTest { } +@Test +public void testHeartBeatRequestStateToStringBase() { +long retryBackoffMs = 100; +long retryBackoffMaxMs = 1000; +LogContext logContext = new LogContext(); +HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState( +logContext, +time, +DEFAULT_HEARTBEAT_INTERVAL_MS, +retryBackoffMs, +retryBackoffMaxMs, +.2 +); + +RequestState requestState = new RequestState( +logContext, +HeartbeatRequestManager.HeartbeatRequestState.class.getName(), +retryBackoffMs, +retryBackoffMaxMs +); + +String target = requestState.toStringBase() + +", remainingMs=" + DEFAULT_HEARTBEAT_INTERVAL_MS + +", heartbeatIntervalMs=" + DEFAULT_HEARTBEAT_INTERVAL_MS; + +assertDoesNotThrow(heartbeatRequestState::toString); +assertEquals(target, heartbeatRequestState.toStringBase()); +} + @Test public void testHeartbeatOnStartup() { NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
(kafka) branch trunk updated: KAFKA-16448: Fix processing exception handler (#16663)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 09be14bb09d KAFKA-16448: Fix processing exception handler (#16663) 09be14bb09d is described below commit 09be14bb09dc336f941a7859232094bfb3cb3b96 Author: Sebastien Viale AuthorDate: Fri Jul 26 01:17:31 2024 +0200 KAFKA-16448: Fix processing exception handler (#16663) Co-authored-by: Dabz Co-authored-by: loicgreffier Minor code improvements across different classed, related to the `ProcessingExceptionHandler` implementation (KIP-1033). Reviewers: Bruno Cadonna , Matthias J. Sax --- .../LogAndFailProcessingExceptionHandler.java | 2 +- .../internals/DefaultErrorHandlerContext.java | 16 - .../internals/FailedProcessingException.java | 8 ++--- .../streams/processor/internals/StreamTask.java| 42 -- 4 files changed, 36 insertions(+), 32 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java index 47fdb09c9c2..9c2cf91c605 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java @@ -32,7 +32,7 @@ public class LogAndFailProcessingExceptionHandler implements ProcessingException @Override public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record record, final Exception exception) { -log.warn("Exception caught during message processing, " + +log.error("Exception caught during message processing, " + "processor node: {}, taskId: {}, source topic: {}, source partition: {}, source offset: {}", context.processorNodeId(), context.taskId(), context.topic(), context.partition(), context.offset(), exception); diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index fc6b6048cb9..ff79860d77e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -53,41 +53,41 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { @Override public String topic() { -return this.topic; +return topic; } @Override public int partition() { -return this.partition; +return partition; } @Override public long offset() { -return this.offset; +return offset; } @Override public Headers headers() { -return this.headers; +return headers; } @Override public byte[] sourceRawKey() { -return this.sourceRawKey; +return sourceRawKey; } @Override public byte[] sourceRawValue() { -return this.sourceRawValue; +return sourceRawValue; } @Override public String processorNodeId() { -return this.processorNodeId; +return processorNodeId; } @Override public TaskId taskId() { -return this.taskId; +return taskId; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java index 81b2a2d4fb1..25f2ae9f6cc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/FailedProcessingException.java @@ -16,16 +16,16 @@ */ package org.apache.kafka.streams.errors.internals; -import org.apache.kafka.common.KafkaException; +import org.apache.kafka.streams.errors.StreamsException; /** * {@link FailedProcessingException} is the top-level exception type generated by Kafka Streams, and indicates errors have * occurred during a {@link org.apache.kafka.streams.processor.internals.ProcessorNode ProcessorNode's} processing. */ -public class FailedProcessingException extends KafkaException { +public class FailedProcessingException extends StreamsException { private static final long serialVersionUID = 1L; -public FailedProcessingException(final Throwable throwable) { -super(throwable); +public FailedProcessingException(final Exception exception) { +super(exception); } } diff --git a/str
(kafka) branch trunk updated: KAFKA-17204: KafkaStreamsCloseOptionsIntegrationTest.before leaks AdminClient (#16692)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new a07294a7327 KAFKA-17204: KafkaStreamsCloseOptionsIntegrationTest.before leaks AdminClient (#16692) a07294a7327 is described below commit a07294a732704b5322df6611088d8d33bc7d3048 Author: TengYao Chi AuthorDate: Sat Jul 27 01:32:39 2024 +0800 KAFKA-17204: KafkaStreamsCloseOptionsIntegrationTest.before leaks AdminClient (#16692) To avoid a resource leak, we need to close the AdminClient after the test. Reviewers: Lianet Magrans , Matthias J. Sax --- .../streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java index c7c34600fc8..d7ce484ba55 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -90,6 +90,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest { @AfterAll public static void closeCluster() { +Utils.closeQuietly(adminClient, "admin"); CLUSTER.stop(); }
(kafka) branch trunk updated: MINOR: update CachingPersistentWindowStoreTest (#16701)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b6c1cb0eec4 MINOR: update CachingPersistentWindowStoreTest (#16701) b6c1cb0eec4 is described below commit b6c1cb0eec4cad56f8db6eba05d24f6b3c44a211 Author: Matthias J. Sax AuthorDate: Mon Jul 29 12:45:13 2024 -0700 MINOR: update CachingPersistentWindowStoreTest (#16701) Refactor test to move off deprecated `transform()` in favor of `process()`. Reviewers: Bill Bejeck --- .../CachingPersistentWindowStoreTest.java | 41 ++ 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java index 07293d2d4b9..6a055f51cf9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java @@ -32,11 +32,12 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; -import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.query.Position; @@ -102,7 +103,7 @@ public class CachingPersistentWindowStoreTest { private static final String TOPIC = "topic"; private static final String CACHE_NAMESPACE = "0_0-store-name"; -private InternalMockProcessorContext context; +private InternalMockProcessorContext context; private RocksDBSegmentedBytesStore bytesStore; private WindowStore underlyingStore; private CachingWindowStore cachingStore; @@ -138,8 +139,8 @@ public class CachingPersistentWindowStoreTest { final WindowStore inner = mock(WindowStore.class); final CachingWindowStore outer = new CachingWindowStore(inner, WINDOW_SIZE, SEGMENT_INTERVAL); when(inner.name()).thenReturn("store"); -outer.init((ProcessorContext) context, outer); -verify(inner).init((ProcessorContext) context, outer); +outer.init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); + verify(inner).init((org.apache.kafka.streams.processor.ProcessorContext) context, outer); } @SuppressWarnings("unchecked") @@ -153,30 +154,28 @@ public class CachingPersistentWindowStoreTest { } @Test -@SuppressWarnings("deprecation") public void shouldNotReturnDuplicatesInRanges() { final StreamsBuilder builder = new StreamsBuilder(); final StoreBuilder> storeBuilder = Stores.windowStoreBuilder( -Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false), -Serdes.String(), -Serdes.String()) +Stores.persistentWindowStore("store-name", ofHours(1L), ofMinutes(1L), false), +Serdes.String(), +Serdes.String()) .withCachingEnabled(); builder.addStateStore(storeBuilder); builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())) -.transform(() -> new Transformer>() { +.process(() -> new Processor() { private WindowStore store; private int numRecordsProcessed; -private ProcessorContext context; +private ProcessorContext context; -@SuppressWarnings("unchecked") @Override -public void init(final ProcessorContext processorContext) { +public void init(final ProcessorContext processorContext) { this.context = processorContext; -this.store = (WindowStore) processorContext.getStateStore("store-name"); +this.store = processorContext.getStateStore("store-name"); int count = 0;
(kafka) branch trunk updated: KAFKA-16448: Add ErrorHandlerContext in production exception handler (#16433)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new b6d5f0556cc KAFKA-16448: Add ErrorHandlerContext in production exception handler (#16433) b6d5f0556cc is described below commit b6d5f0556ccde4490cf76a3af53244915e2d7f99 Author: Sebastien Viale AuthorDate: Tue Jul 30 05:17:15 2024 +0200 KAFKA-16448: Add ErrorHandlerContext in production exception handler (#16433) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Production exception handler and deprecate the previous handle signature. Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- .../errors/DefaultProductionExceptionHandler.java | 8 + .../kafka/streams/errors/ErrorHandlerContext.java | 28 --- .../streams/errors/ProductionExceptionHandler.java | 49 +- .../internals/DefaultErrorHandlerContext.java | 16 -- .../processor/internals/CorruptedRecord.java | 2 +- .../internals/GlobalStateManagerImpl.java | 3 +- .../processor/internals/GlobalStateUpdateTask.java | 3 +- .../processor/internals/ProcessorAdapter.java | 3 +- .../processor/internals/ProcessorContextImpl.java | 3 +- .../streams/processor/internals/ProcessorNode.java | 2 - .../internals/ProcessorRecordContext.java | 18 +- .../processor/internals/RecordCollectorImpl.java | 188 +++-- .../streams/processor/internals/RecordQueue.java | 2 +- .../streams/processor/internals/SinkNode.java | 3 +- .../streams/processor/internals/StampedRecord.java | 18 +- .../streams/processor/internals/StreamTask.java| 6 +- .../AlwaysContinueProductionExceptionHandler.java | 44 - .../ProcessingExceptionHandlerIntegrationTest.java | 2 - .../processor/internals/ProcessorNodeTest.java | 7 +- .../processor/internals/RecordCollectorTest.java | 187 .../kafka/test/InternalMockProcessorContext.java | 7 + 21 files changed, 367 insertions(+), 232 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java index 33a95f4b445..0896114cf28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java @@ -25,12 +25,20 @@ import java.util.Map; * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { +@Deprecated @Override public ProductionExceptionHandlerResponse handle(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } +@Override +public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { +return ProductionExceptionHandlerResponse.FAIL; +} + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java index 0c505475490..6c5e4f19596 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java @@ -89,34 +89,6 @@ public interface ErrorHandlerContext { */ Headers headers(); -/** - * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. - * - * If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - * If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent - * to the repartition topic. - * - * @return the raw byte of the key of the source message - */ -byte[] sourceRawKey(); - -/** - * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. - * - * If this method is invoked within a {@link
(kafka) branch trunk updated: KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler (#16432)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new faaef527d7f KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler (#16432) faaef527d7f is described below commit faaef527d7fa5fc05f92853e0b28b6b5cdbf6a23 Author: Sebastien Viale AuthorDate: Tue Jul 30 05:33:33 2024 +0200 KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler (#16432) This PR is part of KIP1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signature. Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- .../errors/DeserializationExceptionHandler.java| 26 +++- .../errors/LogAndContinueExceptionHandler.java | 14 +++ .../streams/errors/LogAndFailExceptionHandler.java | 14 +++ .../internals/DefaultErrorHandlerContext.java | 12 +- .../streams/processor/internals/ProcessorNode.java | 1 + .../processor/internals/RecordDeserializer.java| 24 +++- .../internals/RecordDeserializerTest.java | 131 +++-- 7 files changed, 205 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 95ccfeced8e..0d64611de67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.streams.errors; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; /** @@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception + * @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} + */ +@Deprecated +default DeserializationHandlerResponse handle(final ProcessorContext context, + final ConsumerRecord record, + final Exception exception) { +throw new UnsupportedOperationException(); +} + +/** + * Inspect a record and the exception received. + * + * @param context error handler context + * @param record record that failed deserialization + * @param exception the actual exception */ -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -DeserializationHandlerResponse handle(final ProcessorContext context, - final ConsumerRecord record, - final Exception exception); +default DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { +return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception); +} /** * Enumeration that describes the response from the exception handler. diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java index a468be2e67a..a93b7c99517 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -32,6 +32,7 @@ import java.util.Map; public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); +@Deprecated @Override public DeserializationHandlerResponse handle(final ProcessorContext context, final ConsumerRecord record, @@ -45,6 +46,19 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH return DeserializationHandlerResponse.CONTINUE; } +@Override +
(kafka) branch trunk updated: HOTFIX: fix compilation error
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3db4a781676 HOTFIX: fix compilation error 3db4a781676 is described below commit 3db4a781676199af0be2b67989ca2204c64788fc Author: Matthias J. Sax AuthorDate: Mon Jul 29 21:07:49 2024 -0700 HOTFIX: fix compilation error --- .../apache/kafka/streams/processor/internals/ProcessorNode.java | 2 +- .../kafka/streams/processor/internals/RecordCollectorImpl.java| 8 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 65eec47cb1d..763edc9a045 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -205,7 +205,7 @@ public class ProcessorNode { throw e; } catch (final Exception e) { final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( -null, +null, // only required to pass for DeserializationExceptionHandler internalProcessorContext.topic(), internalProcessorContext.partition(), internalProcessorContext.offset(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 35097153a5f..de4afc2c924 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -301,12 +301,14 @@ public class RecordCollectorImpl implements RecordCollector { try { final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( +null, // only required to pass for DeserializationExceptionHandler context.recordContext().topic(), context.recordContext().partition(), context.recordContext().offset(), context.recordContext().headers(), processorNodeId, -taskId); +taskId +); response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin); } catch (final Exception e) { log.error("Fatal when handling serialization exception", e); @@ -395,12 +397,14 @@ public class RecordCollectorImpl implements RecordCollector { sendException.set(new TaskCorruptedException(Collections.singleton(taskId))); } else { final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( +null, // only required to pass for DeserializationExceptionHandler context.recordContext().topic(), context.recordContext().partition(), context.recordContext().offset(), context.recordContext().headers(), processorNodeId, -taskId); +taskId +); if (productionExceptionHandler.handle(errorHandlerContext, serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
(kafka) 03/03: HOTFIX: fix compilation error
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git commit b8532070f79ff4d13f61e4cd766b9190299aab75 Author: Matthias J. Sax AuthorDate: Mon Jul 29 21:07:49 2024 -0700 HOTFIX: fix compilation error --- .../apache/kafka/streams/processor/internals/ProcessorNode.java | 2 +- .../kafka/streams/processor/internals/RecordCollectorImpl.java| 8 ++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 65eec47cb1d..763edc9a045 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -205,7 +205,7 @@ public class ProcessorNode { throw e; } catch (final Exception e) { final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( -null, +null, // only required to pass for DeserializationExceptionHandler internalProcessorContext.topic(), internalProcessorContext.partition(), internalProcessorContext.offset(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 35097153a5f..de4afc2c924 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -301,12 +301,14 @@ public class RecordCollectorImpl implements RecordCollector { try { final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( +null, // only required to pass for DeserializationExceptionHandler context.recordContext().topic(), context.recordContext().partition(), context.recordContext().offset(), context.recordContext().headers(), processorNodeId, -taskId); +taskId +); response = productionExceptionHandler.handleSerializationException(errorHandlerContext, record, exception, origin); } catch (final Exception e) { log.error("Fatal when handling serialization exception", e); @@ -395,12 +397,14 @@ public class RecordCollectorImpl implements RecordCollector { sendException.set(new TaskCorruptedException(Collections.singleton(taskId))); } else { final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( +null, // only required to pass for DeserializationExceptionHandler context.recordContext().topic(), context.recordContext().partition(), context.recordContext().offset(), context.recordContext().headers(), processorNodeId, -taskId); +taskId +); if (productionExceptionHandler.handle(errorHandlerContext, serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) { errorMessage += "\nException handler choose to FAIL the processing, no more records would be sent.";
(kafka) branch 3.9 updated (f26f0b66261 -> b8532070f79)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git from f26f0b66261 tests/kafkatest/version.py: Add 3.9.0 as DEV_VERSION new a4ea9aec73a KAFKA-16448: Add ErrorHandlerContext in production exception handler (#16433) new 10d9f7872d8 KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler (#16432) new b8532070f79 HOTFIX: fix compilation error The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../errors/DefaultProductionExceptionHandler.java | 8 + .../errors/DeserializationExceptionHandler.java| 26 ++- .../kafka/streams/errors/ErrorHandlerContext.java | 28 --- .../errors/LogAndContinueExceptionHandler.java | 14 ++ .../streams/errors/LogAndFailExceptionHandler.java | 14 ++ .../streams/errors/ProductionExceptionHandler.java | 49 +- .../internals/DefaultErrorHandlerContext.java | 28 ++- .../processor/internals/CorruptedRecord.java | 2 +- .../internals/GlobalStateManagerImpl.java | 3 +- .../processor/internals/GlobalStateUpdateTask.java | 3 +- .../processor/internals/ProcessorAdapter.java | 3 +- .../processor/internals/ProcessorContextImpl.java | 3 +- .../streams/processor/internals/ProcessorNode.java | 3 +- .../internals/ProcessorRecordContext.java | 18 +- .../processor/internals/RecordCollectorImpl.java | 192 +++-- .../processor/internals/RecordDeserializer.java| 24 ++- .../streams/processor/internals/RecordQueue.java | 2 +- .../streams/processor/internals/SinkNode.java | 3 +- .../streams/processor/internals/StampedRecord.java | 18 +- .../streams/processor/internals/StreamTask.java| 6 +- .../AlwaysContinueProductionExceptionHandler.java | 44 - .../ProcessingExceptionHandlerIntegrationTest.java | 2 - .../processor/internals/ProcessorNodeTest.java | 7 +- .../processor/internals/RecordCollectorTest.java | 187 .../internals/RecordDeserializerTest.java | 131 +- .../kafka/test/InternalMockProcessorContext.java | 7 + 26 files changed, 576 insertions(+), 249 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/errors/AlwaysContinueProductionExceptionHandler.java
(kafka) 02/03: KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler (#16432)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 10d9f7872d8fad81cd95f327ba6140994cf35576 Author: Sebastien Viale AuthorDate: Tue Jul 30 05:33:33 2024 +0200 KAFKA-16448: Add ErrorHandlerContext in deserialization exception handler (#16432) This PR is part of KIP1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Deserialization exception handlers and deprecate the previous handle signature. Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- .../errors/DeserializationExceptionHandler.java| 26 +++- .../errors/LogAndContinueExceptionHandler.java | 14 +++ .../streams/errors/LogAndFailExceptionHandler.java | 14 +++ .../internals/DefaultErrorHandlerContext.java | 12 +- .../streams/processor/internals/ProcessorNode.java | 1 + .../processor/internals/RecordDeserializer.java| 24 +++- .../internals/RecordDeserializerTest.java | 131 +++-- 7 files changed, 205 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 95ccfeced8e..0d64611de67 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.streams.errors; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Configurable; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.processor.ProcessorContext; /** @@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception + * @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} + */ +@Deprecated +default DeserializationHandlerResponse handle(final ProcessorContext context, + final ConsumerRecord record, + final Exception exception) { +throw new UnsupportedOperationException(); +} + +/** + * Inspect a record and the exception received. + * + * @param context error handler context + * @param record record that failed deserialization + * @param exception the actual exception */ -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -DeserializationHandlerResponse handle(final ProcessorContext context, - final ConsumerRecord record, - final Exception exception); +default DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final Exception exception) { +return handle(((DefaultErrorHandlerContext) context).processorContext().orElse(null), record, exception); +} /** * Enumeration that describes the response from the exception handler. diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java index a468be2e67a..a93b7c99517 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java @@ -32,6 +32,7 @@ import java.util.Map; public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); +@Deprecated @Override public DeserializationHandlerResponse handle(final ProcessorContext context, final ConsumerRecord record, @@ -45,6 +46,19 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH return DeserializationHandlerResponse.CONTINUE; } +@Override +public DeserializationHandlerResponse handle(final ErrorHandlerContext context, + final ConsumerRecord record, + final
(kafka) 01/03: KAFKA-16448: Add ErrorHandlerContext in production exception handler (#16433)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git commit a4ea9aec73a3da470d079bad82d92810cac49d55 Author: Sebastien Viale AuthorDate: Tue Jul 30 05:17:15 2024 +0200 KAFKA-16448: Add ErrorHandlerContext in production exception handler (#16433) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR expose the new ErrorHandlerContext as a parameter to the Production exception handler and deprecate the previous handle signature. Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- .../errors/DefaultProductionExceptionHandler.java | 8 + .../kafka/streams/errors/ErrorHandlerContext.java | 28 --- .../streams/errors/ProductionExceptionHandler.java | 49 +- .../internals/DefaultErrorHandlerContext.java | 16 -- .../processor/internals/CorruptedRecord.java | 2 +- .../internals/GlobalStateManagerImpl.java | 3 +- .../processor/internals/GlobalStateUpdateTask.java | 3 +- .../processor/internals/ProcessorAdapter.java | 3 +- .../processor/internals/ProcessorContextImpl.java | 3 +- .../streams/processor/internals/ProcessorNode.java | 2 - .../internals/ProcessorRecordContext.java | 18 +- .../processor/internals/RecordCollectorImpl.java | 188 +++-- .../streams/processor/internals/RecordQueue.java | 2 +- .../streams/processor/internals/SinkNode.java | 3 +- .../streams/processor/internals/StampedRecord.java | 18 +- .../streams/processor/internals/StreamTask.java| 6 +- .../AlwaysContinueProductionExceptionHandler.java | 44 - .../ProcessingExceptionHandlerIntegrationTest.java | 2 - .../processor/internals/ProcessorNodeTest.java | 7 +- .../processor/internals/RecordCollectorTest.java | 187 .../kafka/test/InternalMockProcessorContext.java | 7 + 21 files changed, 367 insertions(+), 232 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java index 33a95f4b445..0896114cf28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java @@ -25,12 +25,20 @@ import java.util.Map; * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { +@Deprecated @Override public ProductionExceptionHandlerResponse handle(final ProducerRecord record, final Exception exception) { return ProductionExceptionHandlerResponse.FAIL; } +@Override +public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception) { +return ProductionExceptionHandlerResponse.FAIL; +} + @Override public void configure(final Map configs) { // ignore diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java index 0c505475490..6c5e4f19596 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java @@ -89,34 +89,6 @@ public interface ErrorHandlerContext { */ Headers headers(); -/** - * Return the non-deserialized byte[] of the input message key if the context has been triggered by a message. - * - * If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - * If this method is invoked in a sub-topology due to a repartition, the returned key would be one sent - * to the repartition topic. - * - * @return the raw byte of the key of the source message - */ -byte[] sourceRawKey(); - -/** - * Return the non-deserialized byte[] of the input message value if the context has been triggered by a message. - * - * If this method is invoked within a {@link Punctuator#punctuate(long) - * punctuation callback}, or while processing a record that was forwarded by a punctuation - * callback, it will return {@code null}. - * - * If this method is
(kafka) branch trunk updated: MINOR: update flaky CustomHandlerIntegrationTest (#16710)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 3c580e25bfb MINOR: update flaky CustomHandlerIntegrationTest (#16710) 3c580e25bfb is described below commit 3c580e25bfbbba27bd63dc335b6a1405e4ef6ef9 Author: Matthias J. Sax AuthorDate: Tue Jul 30 08:13:59 2024 -0700 MINOR: update flaky CustomHandlerIntegrationTest (#16710) This PR reduces the MAX_BLOCK_MS config which defaults to 60sec to 10sec, to avoid a race condition with the 60sec test timeout. Reviewers: Bill Bejeck --- .../integration/CustomHandlerIntegrationTest.java | 57 +- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java index 3eea2ec7d84..873b2eb922d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -58,7 +59,6 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; @Tag("integration") public class CustomHandlerIntegrationTest { private static final int NUM_BROKERS = 1; -private static final int NUM_THREADS = 2; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "false"))); @@ -72,28 +72,27 @@ public class CustomHandlerIntegrationTest { CLUSTER.stop(); } -private final long timeout = 6; +private final long timeoutMs = 60_000; // topic name private static final String STREAM_INPUT = "STREAM_INPUT"; private static final String NON_EXISTING_TOPIC = "non_existing_topic"; +private final AtomicReference caughtException = new AtomicReference<>(); + private KafkaStreams kafkaStreams; -AtomicReference caughtException; -Topology topology; +private Topology topology; private String appId; @BeforeEach public void before(final TestInfo testInfo) throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); CLUSTER.createTopics(STREAM_INPUT); -caughtException = new AtomicReference<>(); final String safeTestName = safeUniqueTestName(testInfo); appId = "app-" + safeTestName; - builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())) -.to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); +.to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); produceRecords(); topology = builder.build(); } @@ -127,7 +126,7 @@ public class CustomHandlerIntegrationTest { streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); -streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); +streamsConfiguration.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10_000); return streamsConfiguration; } @@ -147,20 +146,34 @@ public class CustomHandlerIntegrationTest { }); kafkaStreams.start(); TestUtils.waitForCondition( -() -> kafkaStreams.state() == State.RUNNING, -timeout, -() -> "Kafka Streams application did not reach state RUNNING in " + timeout + " ms"); -while (true) { -if (caughtException.get() != null) { -final Throwable throwable = caughtException.get(); -assertInstanceOf(StreamsException.class, throwable); -assertInstanceOf(TimeoutException.class, throwable.getCause()); -assertInstanceOf(UnknownTopicOrPartitionException.class, throwable.getCause().getCause()); -closeApplication(streamsConfiguration); -break; -} else { -Thread.sleep(100); -} +() -> kafkaStreams.state() == Sta
(kafka) branch trunk updated (2bf7be71ac8 -> 683b8a2beeb)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 2bf7be71ac8 MINOR: Add text and link to blog in announcement template email (#16734) add 683b8a2beeb MINOR: update AdjustStreamThreadCountTest (#16696) No new revisions were added by this update. Summary of changes: .../integration/AdjustStreamThreadCountTest.java | 44 +++--- 1 file changed, 21 insertions(+), 23 deletions(-)
(kafka) branch trunk updated (9e06767ffa8 -> e9d81096587)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 9e06767ffa8 KAFKA-13898 Updated docs for metrics.recording.level (#16402) add e9d81096587 MINOR: simplify code which calles `Punctuator.punctuate()` (#16725) No new revisions were added by this update. Summary of changes: .../apache/kafka/streams/processor/internals/ProcessorNode.java| 7 +-- .../org/apache/kafka/streams/processor/internals/StreamTask.java | 4 ++-- 2 files changed, 3 insertions(+), 8 deletions(-)
(kafka) branch trunk updated: KAFKA-16448: Handle fatal user exception during processing error (#16675)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0dc9b9e4eec KAFKA-16448: Handle fatal user exception during processing error (#16675) 0dc9b9e4eec is described below commit 0dc9b9e4eec124df9f5c92b41db2b4fb7bd49600 Author: Sebastien Viale AuthorDate: Wed Jul 31 07:58:07 2024 +0200 KAFKA-16448: Handle fatal user exception during processing error (#16675) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR catch the exceptions thrown while handling a processing exception Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- .../streams/processor/internals/ProcessorNode.java | 8 +- .../ProcessingExceptionHandlerIntegrationTest.java | 146 + .../processor/internals/ProcessorNodeTest.java | 31 - 3 files changed, 179 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 08da872bb73..3ccfcf24905 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -212,9 +212,13 @@ public class ProcessorNode { internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); -final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler -.handle(errorHandlerContext, record, e); +final ProcessingExceptionHandler.ProcessingHandlerResponse response; +try { +response = processingExceptionHandler.handle(errorHandlerContext, record, e); +} catch (final Exception fatalUserException) { +throw new FailedProcessingException(fatalUserException); +} if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { log.error("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index 6c1a64344ec..61b5ed16bb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -45,10 +45,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -149,9 +151,150 @@ public class ProcessingExceptionHandlerIntegrationTest { } } +@Test +public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() { +final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); +final KeyValue eventError = new KeyValue<>("ID123-2-ERR", "ID123-A2"); + +final MockProcessorSupplier processor = new MockProcessorSupplier<>(); +final StreamsBuilder builder = new StreamsBuilder(); +final AtomicBoolean isExecuted = new AtomicBoolean(false); +builder +.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) +.map(KeyValue::new) +.mapValues(value -> value) +.process(runtimeErrorProcessorSupplierMock()) +.map((k, v) -> { +isExecuted.set(true); +return KeyValue.pair(k, v); +}) +.process(processor); + +final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + +try (final TopologyTestDriver driver = new T
(kafka) branch 3.9 updated: KAFKA-16448: Handle fatal user exception during processing error (#16675)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.9 by this push: new c8dc09c2659 KAFKA-16448: Handle fatal user exception during processing error (#16675) c8dc09c2659 is described below commit c8dc09c2659bb6309d97c692c907c076898b4aeb Author: Sebastien Viale AuthorDate: Wed Jul 31 07:58:07 2024 +0200 KAFKA-16448: Handle fatal user exception during processing error (#16675) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR catch the exceptions thrown while handling a processing exception Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- .../streams/processor/internals/ProcessorNode.java | 8 +- .../ProcessingExceptionHandlerIntegrationTest.java | 146 + .../processor/internals/ProcessorNodeTest.java | 31 - 3 files changed, 179 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 763edc9a045..175c9e104ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -213,9 +213,13 @@ public class ProcessorNode { internalProcessorContext.currentNode().name(), internalProcessorContext.taskId()); -final ProcessingExceptionHandler.ProcessingHandlerResponse response = processingExceptionHandler -.handle(errorHandlerContext, record, e); +final ProcessingExceptionHandler.ProcessingHandlerResponse response; +try { +response = processingExceptionHandler.handle(errorHandlerContext, record, e); +} catch (final Exception fatalUserException) { +throw new FailedProcessingException(fatalUserException); +} if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { log.error("Processing exception handler is set to fail upon" + " a processing error. If you would rather have the streaming pipeline" + diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java index 6c1a64344ec..61b5ed16bb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java @@ -45,10 +45,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -149,9 +151,150 @@ public class ProcessingExceptionHandlerIntegrationTest { } } +@Test +public void shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler() { +final KeyValue event = new KeyValue<>("ID123-1", "ID123-A1"); +final KeyValue eventError = new KeyValue<>("ID123-2-ERR", "ID123-A2"); + +final MockProcessorSupplier processor = new MockProcessorSupplier<>(); +final StreamsBuilder builder = new StreamsBuilder(); +final AtomicBoolean isExecuted = new AtomicBoolean(false); +builder +.stream("TOPIC_NAME", Consumed.with(Serdes.String(), Serdes.String())) +.map(KeyValue::new) +.mapValues(value -> value) +.process(runtimeErrorProcessorSupplierMock()) +.map((k, v) -> { +isExecuted.set(true); +return KeyValue.pair(k, v); +}) +.process(processor); + +final Properties properties = new Properties(); + properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, FailProcessingExceptionHandlerMockTest.class); + +try (final TopologyTestDriver driver = new T
(kafka) branch 3.9 updated: Revert "KAFKA-16508: Streams custom handler should handle the timeout exceptions (#16450)" (#16738)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.9 by this push: new ccb04acb56b Revert "KAFKA-16508: Streams custom handler should handle the timeout exceptions (#16450)" (#16738) ccb04acb56b is described below commit ccb04acb56b5a29f6de4dfc7b98897404eb14c0c Author: Matthias J. Sax AuthorDate: Wed Jul 31 10:29:02 2024 -0700 Revert "KAFKA-16508: Streams custom handler should handle the timeout exceptions (#16450)" (#16738) This reverts commit 15a4501bded513822485dd85fa6258e16f1571ca. We consider this change backward incompatible and will fix forward for 4.0 release via KIP-1065, but need to revert for 3.9 release. Reviewers: Josep Prat , Bill Bejeck --- .../processor/internals/RecordCollectorImpl.java | 13 +- .../integration/CustomHandlerIntegrationTest.java | 166 - .../processor/internals/RecordCollectorTest.java | 50 --- 3 files changed, 1 insertion(+), 228 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index de4afc2c924..42b8d4f082b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -34,7 +34,6 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownServerException; -import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; @@ -389,7 +388,7 @@ public class RecordCollectorImpl implements RecordCollector { "indicating the task may be migrated out"; sendException.set(new TaskMigratedException(errorMessage, exception)); } else { -if (isRetriable(exception)) { +if (exception instanceof RetriableException) { errorMessage += "\nThe broker is either slow or in bad state (like not having enough replicas) in responding the request, " + "or the connection to broker was interrupted sending the request or receiving the response. " + "\nConsider overwriting `max.block.ms` and /or " + @@ -419,16 +418,6 @@ public class RecordCollectorImpl implements RecordCollector { log.error(errorMessage, exception); } -/** - * The `TimeoutException` with root cause `UnknownTopicOrPartitionException` is considered as non-retriable - * (despite `TimeoutException` being a subclass of `RetriableException`, this particular case is explicitly excluded). -*/ -private boolean isRetriable(final Exception exception) { -return exception instanceof RetriableException && -(!(exception instanceof TimeoutException) || exception.getCause() == null -|| !(exception.getCause() instanceof UnknownTopicOrPartitionException)); -} - private boolean isFatalException(final Exception exception) { final boolean securityException = exception instanceof AuthenticationException || exception instanceof AuthorizationException || diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java deleted file mode 100644 index 3eea2ec7d84..000 --- a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apa
(kafka) branch trunk updated: MINOR: update EosIntegrationTest (#16697)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1528264f021 MINOR: update EosIntegrationTest (#16697) 1528264f021 is described below commit 1528264f0217820a86f01964219dbe11b3a60c0e Author: Matthias J. Sax AuthorDate: Wed Jul 31 11:25:44 2024 -0700 MINOR: update EosIntegrationTest (#16697) Refactor test to move off deprecated `transform()` in favor of `process()`. Reviewers: Bill Bejeck --- .../streams/integration/EosIntegrationTest.java| 125 ++--- 1 file changed, 59 insertions(+), 66 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 38e7e5cc0ca..94e48ee3d49 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -43,12 +43,10 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.query.QueryResult; @@ -863,8 +861,8 @@ public class EosIntegrationTest { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); -streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); +streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -901,10 +899,10 @@ public class EosIntegrationTest { .addSource("source", MULTI_PARTITION_INPUT_TOPIC) .addProcessor("processor", () -> new Processor() { KeyValueStore stateStore; - org.apache.kafka.streams.processor.api.ProcessorContext context; +ProcessorContext context; @Override -public void init(final org.apache.kafka.streams.processor.api.ProcessorContext context) { +public void init(final ProcessorContext context) { Processor.super.init(context); this.context = context; stateStore = context.getStateStore(stateStoreName); @@ -1014,7 +1012,7 @@ public class EosIntegrationTest { final long topicEndOffset = consumer.position(tp); assertTrue(topicEndOffset >= checkpointedOffset, -"changelog topic end " + topicEndOffset + " is less than checkpointed offset " + checkpointedOffset); +"changelog topic end " + topicEndOffset + " is less than checkpointed offset " + checkpointedOffset); consumer.seekToBeginning(partitions); @@ -1070,80 +1068,75 @@ public class EosIntegrationTest { } final KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC); -input.transform(new TransformerSupplier>() { -@SuppressWarnings("unchecked") -@Override -public Transformer> get() { -return new Transformer>() { -ProcessorContext context; -KeyValueStore state = null; +input.process(() -> new Processor() { +ProcessorContext context; +KeyValueStore state = null; -@Override -
(kafka) branch trunk updated: MINOR: update EosV2UpgradeIntegrationTest (#16698)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new d6a41ac3ca3 MINOR: update EosV2UpgradeIntegrationTest (#16698) d6a41ac3ca3 is described below commit d6a41ac3ca3bbd733c3385861306cf04f2661a22 Author: Matthias J. Sax AuthorDate: Wed Jul 31 11:26:25 2024 -0700 MINOR: update EosV2UpgradeIntegrationTest (#16698) Refactor test to move off deprecated `transform()` in favor of `process()`. Reviewers: Bill Bejeck --- .../integration/EosV2UpgradeIntegrationTest.java | 97 ++ 1 file changed, 46 insertions(+), 51 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java index 087a439d084..8653f69fca6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java @@ -40,9 +40,9 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils.StableAssignmentListener; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -847,7 +847,6 @@ public class EosV2UpgradeIntegrationTest { } } -@SuppressWarnings("deprecation") private KafkaStreams getKafkaStreams(final String appDir, final String processingGuarantee, final boolean injectError) { @@ -861,62 +860,58 @@ public class EosV2UpgradeIntegrationTest { builder.addStateStore(storeBuilder); final KStream input = builder.stream(MULTI_PARTITION_INPUT_TOPIC); -input.transform(new TransformerSupplier>() { -@Override -public Transformer> get() { -return new Transformer>() { -ProcessorContext context; -KeyValueStore state = null; -AtomicBoolean crash; -AtomicInteger sharedCommit; - -@Override -public void init(final ProcessorContext context) { -this.context = context; -state = context.getStateStore(storeName); -final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString(); -if (APP_DIR_1.equals(clientId)) { -crash = errorInjectedClient1; -sharedCommit = commitCounterClient1; -} else { -crash = errorInjectedClient2; -sharedCommit = commitCounterClient2; -} +input.process(() -> new Processor() { +ProcessorContext context; +KeyValueStore state = null; +AtomicBoolean crash; +AtomicInteger sharedCommit; + +@Override +public void init(final ProcessorContext context) { +this.context = context; +state = context.getStateStore(storeName); +final String clientId = context.appConfigs().get(StreamsConfig.CLIENT_ID_CONFIG).toString(); +if (APP_DIR_1.equals(clientId)) { +crash = errorInjectedClient1; +sharedCommit = commitCounterClient1; +} else { +crash = errorInjectedClient2; +sharedCommit = commitCounterClient2; } +} -@Override -public KeyValue transform(final Long key, final Long value) { -if ((value + 1) % 10 == 0) { -if (sharedCommit.get() < 0 || -sharedCommit.incrementAndGet() == 2) { +@Override +public void process(final Record record) { +fi
(kafka) branch trunk updated: MINOR: update KStreamAggregationIntegrationTest (#16699)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 08b3d9f37b0 MINOR: update KStreamAggregationIntegrationTest (#16699) 08b3d9f37b0 is described below commit 08b3d9f37b0682078ac1d273497fb88cebfc9528 Author: Matthias J. Sax AuthorDate: Wed Jul 31 11:27:09 2024 -0700 MINOR: update KStreamAggregationIntegrationTest (#16699) Refactor test to move off deprecated `transform()` in favor of `process()`. Reviewers: Bill Bejeck --- .../KStreamAggregationIntegrationTest.java | 781 ++--- 1 file changed, 386 insertions(+), 395 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index a9b63f66d29..d4a53f1b222 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -48,14 +48,13 @@ import org.apache.kafka.streams.kstream.SessionWindows; import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.UnlimitedWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.WindowedSerdes; import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlySessionStore; @@ -74,7 +73,6 @@ import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.PrintStream; import java.time.Duration; import java.util.Arrays; @@ -99,7 +97,6 @@ import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -@SuppressWarnings({"unchecked", "deprecation"}) @Timeout(600) @Tag("integration") public class KStreamAggregationIntegrationTest { @@ -108,7 +105,7 @@ public class KStreamAggregationIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); @BeforeAll -public static void startCluster() throws IOException { +public static void startCluster() throws Exception { CLUSTER.start(); } @@ -132,7 +129,7 @@ public class KStreamAggregationIntegrationTest { private KStream stream; @BeforeEach -public void before(final TestInfo testInfo) throws InterruptedException { +public void before(final TestInfo testInfo) throws Exception { builder = new StreamsBuilder(); final String safeTestName = safeUniqueTestName(testInfo); createTopics(safeTestName); @@ -143,8 +140,8 @@ public class KStreamAggregationIntegrationTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); -streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); +streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); final KeyValueMapper mapper = MockMapper.selectValueMapper(); stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String())); @@ -156,7 +153,7 @@ public class KStreamAggregationIntegrationTest { } @AfterEach -public void whenShuttingDown() throws IOException { +public void whenShuttingDown() throws Exception { if (kafkaStreams != null) { kafkaStreams.close(); } @@ -183,21 +180,25 @@ public class KStreamAggregationIntegrationTest { results.sort(KStreamAggregationIntegrationTest::compare); -assertThat(results, is(A
(kafka) branch trunk updated: MINOR: update StandbyTaskCreationIntegrationTest (#16700)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6224feee656 MINOR: update StandbyTaskCreationIntegrationTest (#16700) 6224feee656 is described below commit 6224feee656303d66467647f4a71f0a3daa3b2b8 Author: Matthias J. Sax AuthorDate: Wed Jul 31 11:27:38 2024 -0700 MINOR: update StandbyTaskCreationIntegrationTest (#16700) Refactor test to move off deprecated `transform()` in favor of `process()`. Reviewers: Bill Bejeck --- .../StandbyTaskCreationIntegrationTest.java| 35 +- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index d2002f1a76e..aa08da6da26 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -19,7 +19,6 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.ThreadMetadata; @@ -27,8 +26,8 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -87,20 +86,19 @@ public class StandbyTaskCreationIntegrationTest { client2.close(); } -private Properties streamsConfiguration(final TestInfo testInfo) { +private Properties streamsConfiguration() { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); -streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); +streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); return streamsConfiguration; } @Test -@SuppressWarnings("deprecation") -public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final TestInfo testInfo) throws Exception { +public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); final String stateStoreName = "myTransformState"; final StoreBuilder> keyValueStoreBuilder = @@ -109,21 +107,16 @@ public class StandbyTaskCreationIntegrationTest { Serdes.Integer()).withLoggingDisabled(); builder.addStateStore(keyValueStoreBuilder); builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.Integer())) -.transform(() -> new Transformer>() { +// don't use method reference below as it won't create a new `Processor` instance but re-use the same object +.process(() -> new Processor() { @Override -public void init(final ProcessorContext context) {} - -@Override -public KeyValue transform(final Integer key, final Integer value) { -return null; +public void process(final Record record) { +// no-op } - -@Override -public void close() {} }, stateStoreName); final Topology topology = builder.build(); -c
(kafka) branch trunk updated: MINOR: update SuppressionDurabilityIntegrationTest (#16740)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8f2679bebf6 MINOR: update SuppressionDurabilityIntegrationTest (#16740) 8f2679bebf6 is described below commit 8f2679bebf6d5cb59065d2ab9b9d5604c72175b3 Author: Matthias J. Sax AuthorDate: Wed Jul 31 11:28:09 2024 -0700 MINOR: update SuppressionDurabilityIntegrationTest (#16740) Refactor test to move off deprecated `transform()` in favor of `process()`. Reviewers: Bill Bejeck --- .../SuppressionDurabilityIntegrationTest.java | 33 +- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index 497b98ae354..37655aebce7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -38,9 +37,10 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.TestUtils; @@ -141,12 +141,12 @@ public class SuppressionDurabilityIntegrationTest { final MetadataValidator metadataValidator = new MetadataValidator(input); suppressedCounts -.transform(metadataValidator) +.process(metadataValidator) .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); valueCounts .toStream() -.transform(metadataValidator) +.process(metadataValidator) .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); final Properties streamsConfig = mkProperties(mkMap( @@ -252,7 +252,7 @@ public class SuppressionDurabilityIntegrationTest { } } -private static final class MetadataValidator implements TransformerSupplier> { +private static final class MetadataValidator implements ProcessorSupplier { private static final Logger LOG = LoggerFactory.getLogger(MetadataValidator.class); private final AtomicReference firstException = new AtomicReference<>(); private final String topic; @@ -262,29 +262,24 @@ public class SuppressionDurabilityIntegrationTest { } @Override -public Transformer> get() { -return new Transformer>() { -private ProcessorContext context; +public Processor get() { +return new Processor() { +private ProcessorContext context; @Override -public void init(final ProcessorContext context) { +public void init(final ProcessorContext context) { this.context = context; } @Override -public KeyValue transform(final String key, final Long value) { +public void process(final Record record) { try { -assertThat(context.topic(), equalTo(topic)); +assertThat(context.recordMetadata().get().topic(), equalTo(topic)); } catch (final Throwable e) { firstException.compareAndSet(null, e); LOG.error("Validation Failed", e); } -return new KeyValue<>(key, value); -} - -@Override -public void close() { - +context.forward(record); } }; }
(kafka) branch trunk updated: KAFKA-16448: Unify class cast exception handling for both key and value (#16736)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 0eb9ac2bd08 KAFKA-16448: Unify class cast exception handling for both key and value (#16736) 0eb9ac2bd08 is described below commit 0eb9ac2bd080800f300a1bd28c75bf84e793757c Author: Loïc GREFFIER AuthorDate: Wed Jul 31 22:24:15 2024 +0200 KAFKA-16448: Unify class cast exception handling for both key and value (#16736) Part of KIP-1033. Minor code cleanup. Reviewers: Matthias J. Sax --- .../processor/internals/RecordCollectorImpl.java | 52 +- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index de4afc2c924..a4dc0a68062 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -53,9 +53,11 @@ import org.apache.kafka.streams.processor.internals.metrics.TopicMetrics; import org.slf4j.Logger; +import java.text.MessageFormat; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -200,7 +202,8 @@ public class RecordCollectorImpl implements RecordCollector { try { keyBytes = keySerializer.serialize(topic, headers, key); } catch (final ClassCastException exception) { -throw createStreamsExceptionForKeyClassCastException( +throw createStreamsExceptionForClassCastException( +ProductionExceptionHandler.SerializationExceptionOrigin.KEY, topic, key, keySerializer, @@ -223,7 +226,8 @@ public class RecordCollectorImpl implements RecordCollector { try { valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { -throw createStreamsExceptionForValueClassCastException( +throw createStreamsExceptionForClassCastException( +ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, topic, value, valueSerializer, @@ -335,39 +339,27 @@ public class RecordCollectorImpl implements RecordCollector { droppedRecordsSensor.record(); } -private StreamsException createStreamsExceptionForKeyClassCastException(final String topic, - final K key, - final Serializer keySerializer, - final ClassCastException exception) { -final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); -return new StreamsException( -String.format( -"ClassCastException while producing data to topic %s. " + -"The key serializer %s is not compatible to the actual key type: %s. " + -"Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + -"(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", -topic, -keySerializer.getClass().getName(), -keyClass), -exception); -} -private StreamsException createStreamsExceptionForValueClassCastException(final String topic, - final V value, - final Serializer valueSerializer, - final ClassCastException exception) { -final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); +private StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin, + final String topic, +
(kafka) branch 3.9 updated: KAFKA-16448: Unify class cast exception handling for both key and value (#16736)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.9 by this push: new aaed1bdd891 KAFKA-16448: Unify class cast exception handling for both key and value (#16736) aaed1bdd891 is described below commit aaed1bdd8911be329123c7f675deb6875ebf55bd Author: Loïc GREFFIER AuthorDate: Wed Jul 31 22:24:15 2024 +0200 KAFKA-16448: Unify class cast exception handling for both key and value (#16736) Part of KIP-1033. Minor code cleanup. Reviewers: Matthias J. Sax --- .../processor/internals/RecordCollectorImpl.java | 52 +- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 42b8d4f082b..7a8b77b8a5b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -52,9 +52,11 @@ import org.apache.kafka.streams.processor.internals.metrics.TopicMetrics; import org.slf4j.Logger; +import java.text.MessageFormat; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -199,7 +201,8 @@ public class RecordCollectorImpl implements RecordCollector { try { keyBytes = keySerializer.serialize(topic, headers, key); } catch (final ClassCastException exception) { -throw createStreamsExceptionForKeyClassCastException( +throw createStreamsExceptionForClassCastException( +ProductionExceptionHandler.SerializationExceptionOrigin.KEY, topic, key, keySerializer, @@ -222,7 +225,8 @@ public class RecordCollectorImpl implements RecordCollector { try { valBytes = valueSerializer.serialize(topic, headers, value); } catch (final ClassCastException exception) { -throw createStreamsExceptionForValueClassCastException( +throw createStreamsExceptionForClassCastException( +ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, topic, value, valueSerializer, @@ -334,39 +338,27 @@ public class RecordCollectorImpl implements RecordCollector { droppedRecordsSensor.record(); } -private StreamsException createStreamsExceptionForKeyClassCastException(final String topic, - final K key, - final Serializer keySerializer, - final ClassCastException exception) { -final String keyClass = key == null ? "unknown because key is null" : key.getClass().getName(); -return new StreamsException( -String.format( -"ClassCastException while producing data to topic %s. " + -"The key serializer %s is not compatible to the actual key type: %s. " + -"Change the default key serde in StreamConfig or provide the correct key serde via method parameters " + -"(for example if using the DSL, `#to(String topic, Produced produced)` with " + - "`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).", -topic, -keySerializer.getClass().getName(), -keyClass), -exception); -} -private StreamsException createStreamsExceptionForValueClassCastException(final String topic, - final V value, - final Serializer valueSerializer, - final ClassCastException exception) { -final String valueClass = value == null ? "unknown because value is null" : value.getClass().getName(); +private StreamsException createStreamsExceptionForClassCastException(final ProductionExceptionHandler.SerializationExceptionOrigin origin, + final String topic, +
(kafka) branch trunk updated: KAFKA-16448: Handle processing exceptions in punctuate (#16300)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new f1ef7a5a9f8 KAFKA-16448: Handle processing exceptions in punctuate (#16300) f1ef7a5a9f8 is described below commit f1ef7a5a9f87cda7ae1209fc2285259eba4204ca Author: Sebastien Viale AuthorDate: Thu Aug 1 00:53:47 2024 +0200 KAFKA-16448: Handle processing exceptions in punctuate (#16300) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR actually catches processing exceptions from punctuate. Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- checkstyle/suppressions.xml| 2 +- .../streams/processor/internals/StreamTask.java| 44 ++- .../processor/internals/StreamTaskTest.java| 137 - 3 files changed, 176 insertions(+), 7 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ba28341d1a3..8c96ce71237 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -193,7 +193,7 @@ + files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8b253c6e16a..6f2edd442b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -29,12 +29,14 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; @@ -63,6 +65,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.singleton; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor; @@ -101,6 +104,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final Sensor restoreRemainingSensor; private final Sensor punctuateLatencySensor; private final Sensor bufferedRecordsSensor; +private final Sensor droppedRecordsSensor; private final Map e2eLatencySensors = new HashMap<>(); private final RecordQueueCreator recordQueueCreator; @@ -160,6 +164,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics); bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics); +droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId, streamsMetrics); for (final String terminalNodeName : topology.terminalNodes()) { e2eLatencySensors.put( @@ -915,15 +920,48 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, try { maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor); -} catch (final StreamsException e) { +} catch (final FailedProcessingException e) { +throw createStreamsException(node.name(), e.getCause()); +} catch (final TaskCorruptedException | TaskMigratedException e) { throw e; -} catch (final RuntimeException e) {
(kafka) branch 3.9 updated (aaed1bdd891 -> 578fef23558)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git from aaed1bdd891 KAFKA-16448: Unify class cast exception handling for both key and value (#16736) new 2c957a6e5c1 MINOR: simplify code which calles `Punctuator.punctuate()` (#16725) new 578fef23558 KAFKA-16448: Handle processing exceptions in punctuate (#16300) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: checkstyle/suppressions.xml| 2 +- .../streams/processor/internals/ProcessorNode.java | 7 +- .../streams/processor/internals/StreamTask.java| 48 +++- .../processor/internals/StreamTaskTest.java| 137 - 4 files changed, 179 insertions(+), 15 deletions(-)
(kafka) 01/02: MINOR: simplify code which calles `Punctuator.punctuate()` (#16725)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 2c957a6e5c1d3ee9c8251e11694040b85662faf9 Author: Matthias J. Sax AuthorDate: Tue Jul 30 17:10:40 2024 -0700 MINOR: simplify code which calles `Punctuator.punctuate()` (#16725) Reviewers: Bill Bejeck --- .../apache/kafka/streams/processor/internals/ProcessorNode.java| 7 +-- .../org/apache/kafka/streams/processor/internals/StreamTask.java | 4 ++-- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 175c9e104ef..eaed7c6b8d5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -24,7 +24,6 @@ import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.errors.internals.FailedProcessingException; -import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; @@ -203,7 +202,7 @@ public class ProcessorNode { } catch (final FailedProcessingException | TaskCorruptedException | TaskMigratedException e) { // Rethrow exceptions that should not be handled here throw e; -} catch (final Exception e) { +} catch (final RuntimeException e) { final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( null, // only required to pass for DeserializationExceptionHandler internalProcessorContext.topic(), @@ -232,10 +231,6 @@ public class ProcessorNode { } } -public void punctuate(final long timestamp, final Punctuator punctuator) { -punctuator.punctuate(timestamp); -} - public boolean isTerminalNode() { return children.isEmpty(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8cbe5780b90..8b253c6e16a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -806,7 +806,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } catch (final StreamsException exception) { record = null; throw exception; -} catch (final Exception e) { +} catch (final RuntimeException e) { handleException(e); } finally { processorContext.setCurrentNode(null); @@ -914,7 +914,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, } try { -maybeMeasureLatency(() -> node.punctuate(timestamp, punctuator), time, punctuateLatencySensor); +maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor); } catch (final StreamsException e) { throw e; } catch (final RuntimeException e) {
(kafka) 02/02: KAFKA-16448: Handle processing exceptions in punctuate (#16300)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git commit 578fef23558830800b9e1ec561e6ec8448134d82 Author: Sebastien Viale AuthorDate: Thu Aug 1 00:53:47 2024 +0200 KAFKA-16448: Handle processing exceptions in punctuate (#16300) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR actually catches processing exceptions from punctuate. Co-authored-by: Dabz Co-authored-by: loicgreffier Reviewers: Bruno Cadonna , Matthias J. Sax --- checkstyle/suppressions.xml| 2 +- .../streams/processor/internals/StreamTask.java| 44 ++- .../processor/internals/StreamTaskTest.java| 137 - 3 files changed, 176 insertions(+), 7 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ba28341d1a3..8c96ce71237 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -193,7 +193,7 @@ + files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8b253c6e16a..6f2edd442b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -29,12 +29,14 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; @@ -63,6 +65,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.singleton; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor; @@ -101,6 +104,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final Sensor restoreRemainingSensor; private final Sensor punctuateLatencySensor; private final Sensor bufferedRecordsSensor; +private final Sensor droppedRecordsSensor; private final Map e2eLatencySensors = new HashMap<>(); private final RecordQueueCreator recordQueueCreator; @@ -160,6 +164,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics); bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics); +droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId, streamsMetrics); for (final String terminalNodeName : topology.terminalNodes()) { e2eLatencySensors.put( @@ -915,15 +920,48 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, try { maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor); -} catch (final StreamsException e) { +} catch (final FailedProcessingException e) { +throw createStreamsException(node.name(), e.getCause()); +} catch (final TaskCorruptedException | TaskMigratedException e) { throw e; -} catch (final RuntimeException e) { -throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); +} catch (final Exception e
(kafka-merge-queue-sandbox) branch test created (now 20001cf)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch test in repository https://gitbox.apache.org/repos/asf/kafka-merge-queue-sandbox.git at 20001cf Test This branch includes the following new commits: new 20001cf Test The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference.
(kafka-merge-queue-sandbox) 01/01: Test
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch test in repository https://gitbox.apache.org/repos/asf/kafka-merge-queue-sandbox.git commit 20001cffc5b5ecda6fe15c78b3bffbabe31c76b2 Author: Matthias J. Sax AuthorDate: Wed Jul 31 16:41:09 2024 -0700 Test --- test | 0 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/test b/test new file mode 100644 index 000..e69de29
(kafka) branch trunk updated (f1ef7a5a9f8 -> ed179c8ba76)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from f1ef7a5a9f8 KAFKA-16448: Handle processing exceptions in punctuate (#16300) add ed179c8ba76 MINOR: update StreamsGraphTest (#16741) No new revisions were added by this update. Summary of changes: .../kstream/internals/graph/StreamsGraphTest.java | 55 +++--- 1 file changed, 27 insertions(+), 28 deletions(-)
(kafka-merge-queue-sandbox) branch test deleted (was 20001cf)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch test in repository https://gitbox.apache.org/repos/asf/kafka-merge-queue-sandbox.git was 20001cf Test The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(kafka) branch trunk updated: KAFKA-16448: Update documentation (#16776)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 16cc8775331 KAFKA-16448: Update documentation (#16776) 16cc8775331 is described below commit 16cc87753314823a39ef96024eda86e23b12abe1 Author: Sebastien Viale AuthorDate: Fri Aug 2 18:52:24 2024 +0200 KAFKA-16448: Update documentation (#16776) Updated docs for KIP-1033. Reviewers: Matthias J. Sax --- docs/streams/developer-guide/config-streams.html | 93 +++- docs/streams/upgrade-guide.html | 9 +++ 2 files changed, 84 insertions(+), 18 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index f50945778a1..68137dc4b49 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -82,6 +82,7 @@ settings.put(... , ...); num.standby.replicas num.stream.threads probing.rebalance.interval.ms + processing.exception.handler processing.guarantee rack.aware.assignment.non_overlap_cost rack.aware.assignment.strategy @@ -395,83 +396,88 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up. 60 milliseconds (10 minutes) - processing.guarantee + processing.exception.handler +Medium +Exception handling class that implements the ProcessingExceptionHandler interface. +LogAndFailProcessingExceptionHandler + + processing.guarantee Medium The processing mode. Can be either "at_least_once" (default) or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). Deprecated config options are "exactly_once" (for EOS version 1) and "exactly_once_beta" (for EOS version 2, requires broker version 2.5+). See Processing Guarantee - poll.ms + poll.ms Low The amount of time in milliseconds to block waiting for input. 100 milliseconds - rack.aware.assignment.tags + rack.aware.assignment.tags Medium List of tag keys used to distribute standby replicas across Kafka Streams clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over clients with different tag values. the empty list - replication.factor + replication.factor Medium The replication factor for changelog topics and repartition topics created by the application. The default of -1 (meaning: use broker default replication factor) requires broker version 2.4 or newer. -1 - retry.backoff.ms + retry.backoff.ms Medium The amount of time in milliseconds, before a request is retried. This applies if the retries parameter is configured to be greater than 0. 100 - rocksdb.config.setter + rocksdb.config.setter Medium The RocksDB configuration. - state.cleanup.delay.ms + state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 60 milliseconds (10 minutes) - state.dir + state.dir High Directory location for state stores. /${java.io.tmpdir}/kafka-streams - task.assignor.class + task.assignor.class Medium A task assignor class or class name implementing the TaskAssignor interface. The high-availability task assignor. - task.timeout.ms + task.timeout.ms Medium The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0 ms, a task would raise an error for the first internal error. For any timeout larger than 0 ms, a task will retry at least once before an error is raised. 30 milliseconds (5 minutes) - topology.optimization + topology.optimization Medium A configuration telling Kafka Streams if it should optimize the topology and what optim
(kafka) branch 3.9 updated: KAFKA-16448: Update documentation (#16776)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.9 by this push: new 4afe5f380a2 KAFKA-16448: Update documentation (#16776) 4afe5f380a2 is described below commit 4afe5f380a2cc0a3cc9c15198fb49f59b1672616 Author: Sebastien Viale AuthorDate: Fri Aug 2 18:52:24 2024 +0200 KAFKA-16448: Update documentation (#16776) Updated docs for KIP-1033. Reviewers: Matthias J. Sax --- docs/streams/developer-guide/config-streams.html | 93 +++- docs/streams/upgrade-guide.html | 9 +++ 2 files changed, 84 insertions(+), 18 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index f50945778a1..68137dc4b49 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -82,6 +82,7 @@ settings.put(... , ...); num.standby.replicas num.stream.threads probing.rebalance.interval.ms + processing.exception.handler processing.guarantee rack.aware.assignment.non_overlap_cost rack.aware.assignment.strategy @@ -395,83 +396,88 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up. 60 milliseconds (10 minutes) - processing.guarantee + processing.exception.handler +Medium +Exception handling class that implements the ProcessingExceptionHandler interface. +LogAndFailProcessingExceptionHandler + + processing.guarantee Medium The processing mode. Can be either "at_least_once" (default) or "exactly_once_v2" (for EOS version 2, requires broker version 2.5+). Deprecated config options are "exactly_once" (for EOS version 1) and "exactly_once_beta" (for EOS version 2, requires broker version 2.5+). See Processing Guarantee - poll.ms + poll.ms Low The amount of time in milliseconds to block waiting for input. 100 milliseconds - rack.aware.assignment.tags + rack.aware.assignment.tags Medium List of tag keys used to distribute standby replicas across Kafka Streams clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over clients with different tag values. the empty list - replication.factor + replication.factor Medium The replication factor for changelog topics and repartition topics created by the application. The default of -1 (meaning: use broker default replication factor) requires broker version 2.4 or newer. -1 - retry.backoff.ms + retry.backoff.ms Medium The amount of time in milliseconds, before a request is retried. This applies if the retries parameter is configured to be greater than 0. 100 - rocksdb.config.setter + rocksdb.config.setter Medium The RocksDB configuration. - state.cleanup.delay.ms + state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 60 milliseconds (10 minutes) - state.dir + state.dir High Directory location for state stores. /${java.io.tmpdir}/kafka-streams - task.assignor.class + task.assignor.class Medium A task assignor class or class name implementing the TaskAssignor interface. The high-availability task assignor. - task.timeout.ms + task.timeout.ms Medium The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of 0 ms, a task would raise an error for the first internal error. For any timeout larger than 0 ms, a task will retry at least once before an error is raised. 30 milliseconds (5 minutes) - topology.optimization + topology.optimization Medium A configuration telling Kafka Streams if it should optimize the topology and what optim
(kafka) branch trunk updated (5afdb17092f -> 3922cadc5d2)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 5afdb17092f MINOR: Fix consumer group warmup in MirroMaker 2 integration tests (#16771) add 3922cadc5d2 KAFKA-9738: Deprecate old Processor API (#16742) No new revisions were added by this update. Summary of changes: docs/streams/upgrade-guide.html| 10 +++ .../wordcount/WordCountTransformerTest.java| 3 +- .../org/apache/kafka/streams/StreamsBuilder.java | 14 ++-- .../apache/kafka/streams/internals/ApiUtils.java | 5 +- .../kafka/streams/kstream/KeyValueMapper.java | 4 +- .../apache/kafka/streams/kstream/Transformer.java | 2 + .../kafka/streams/kstream/TransformerSupplier.java | 4 +- .../kafka/streams/kstream/ValueTransformer.java| 3 + .../streams/kstream/ValueTransformerSupplier.java | 5 +- .../streams/kstream/internals/AbstractStream.java | 7 +- .../kstream/internals/KStreamFlatTransform.java| 14 ++-- .../streams/kstream/internals/KStreamImpl.java | 18 ++-- .../internals/TransformerSupplierAdapter.java | 15 ++-- .../streams/processor/ConnectedStoreProvider.java | 18 ++-- .../KStreamTransformIntegrationTest.java | 41 +++-- .../integration/StandbyTaskEOSIntegrationTest.java | 7 +- .../kstream/internals/AbstractStreamTest.java | 5 +- .../internals/KStreamFlatTransformTest.java| 9 +- .../streams/kstream/internals/KStreamImplTest.java | 97 -- .../kstream/internals/KStreamTransformTest.java| 13 ++- .../internals/KStreamTransformValuesTest.java | 6 +- .../internals/TransformerSupplierAdapterTest.java | 32 +++ .../kafka/test/MockInternalProcessorContext.java | 4 +- .../apache/kafka/test/NoopValueTransformer.java| 4 +- .../streams/scala/FunctionsCompatConversions.scala | 14 .../streams/processor/MockProcessorContext.java| 9 +- .../processor/api/MockProcessorContext.java| 6 +- .../kafka/streams/MockProcessorContextTest.java| 52 ++-- 28 files changed, 217 insertions(+), 204 deletions(-)
(kafka) branch trunk updated: KAFKA-16448: Unify error-callback exception handling (#16745)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 7c539f90298 KAFKA-16448: Unify error-callback exception handling (#16745) 7c539f90298 is described below commit 7c539f902983a76f14d0cd993e7d6dcbfdacd909 Author: Matthias J. Sax AuthorDate: Sat Aug 3 12:40:51 2024 -0700 KAFKA-16448: Unify error-callback exception handling (#16745) Follow up code cleanup for KIP-1033. This PR unifies the handling of both error cases for exception handlers: - handler throws an exception - handler returns null The unification happens for all 5 handler cases: - deserialzation - production / serialization - production / send - processing - punctuation Reviewers: Sebastien Viale , Loic Greffier , Bill Bejeck --- .../errors/DeserializationExceptionHandler.java| 2 +- .../streams/errors/ProductionExceptionHandler.java | 2 +- .../internals/DefaultErrorHandlerContext.java | 12 ++ .../internals/FailedProcessingException.java | 7 +- .../internals/GlobalStateManagerImpl.java | 5 +- .../streams/processor/internals/ProcessorNode.java | 21 ++- .../processor/internals/RecordCollectorImpl.java | 106 +++ .../processor/internals/RecordDeserializer.java| 50 ++--- .../streams/processor/internals/StreamTask.java| 49 +++-- .../streams/integration/EosIntegrationTest.java| 5 +- .../integration/EosV2UpgradeIntegrationTest.java | 6 +- .../ProcessingExceptionHandlerIntegrationTest.java | 30 +-- .../processor/internals/RecordCollectorTest.java | 194 +++ .../internals/RecordDeserializerTest.java | 210 ++--- .../processor/internals/StreamTaskTest.java| 140 ++ 15 files changed, 589 insertions(+), 250 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 0d64611de67..198a97cce44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -37,7 +37,7 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception - * @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} + * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead. */ @Deprecated default DeserializationHandlerResponse handle(final ProcessorContext context, diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 25aa00f7a92..939b1ecbcd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -59,7 +59,7 @@ public interface ProductionExceptionHandler extends Configurable { * * @param recordthe record that failed to serialize * @param exception the exception that occurred during serialization - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. + * @deprecated Since 3.9. Use {@link #handleSerializationException(ErrorHandlerContext, ProducerRecord, Exception, SerializationExceptionOrigin)} instead. */ @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index c907ff3eb89..77500ce3c36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -81,6 +81,18 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { return taskId; } +@Override +public String toString() { +// we do exclude headers on purpose, to not accidentally log user data +return "ErrorHandlerContext{" + +"topic='" + topic + '\'' + +", partition=" + partition + +", offset=" + o
(kafka) branch 3.9 updated: KAFKA-16448: Unify error-callback exception handling (#16745)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.9 by this push: new 2ddbfebecbd KAFKA-16448: Unify error-callback exception handling (#16745) 2ddbfebecbd is described below commit 2ddbfebecbdd082ae18adc88b49b9a3f1811c7ab Author: Matthias J. Sax AuthorDate: Sat Aug 3 12:40:51 2024 -0700 KAFKA-16448: Unify error-callback exception handling (#16745) Follow up code cleanup for KIP-1033. This PR unifies the handling of both error cases for exception handlers: - handler throws an exception - handler returns null The unification happens for all 5 handler cases: - deserialzation - production / serialization - production / send - processing - punctuation Reviewers: Sebastien Viale , Loic Greffier , Bill Bejeck --- .../errors/DeserializationExceptionHandler.java| 2 +- .../streams/errors/ProductionExceptionHandler.java | 2 +- .../internals/DefaultErrorHandlerContext.java | 12 ++ .../internals/FailedProcessingException.java | 7 +- .../internals/GlobalStateManagerImpl.java | 5 +- .../streams/processor/internals/ProcessorNode.java | 21 ++- .../processor/internals/RecordCollectorImpl.java | 106 +++ .../processor/internals/RecordDeserializer.java| 50 ++--- .../streams/processor/internals/StreamTask.java| 49 +++-- .../streams/integration/EosIntegrationTest.java| 5 +- .../integration/EosV2UpgradeIntegrationTest.java | 6 +- .../ProcessingExceptionHandlerIntegrationTest.java | 30 +-- .../processor/internals/RecordCollectorTest.java | 152 +-- .../internals/RecordDeserializerTest.java | 210 ++--- .../processor/internals/StreamTaskTest.java| 140 ++ 15 files changed, 568 insertions(+), 229 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java index 0d64611de67..198a97cce44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java @@ -37,7 +37,7 @@ public interface DeserializationExceptionHandler extends Configurable { * @param context processor context * @param record record that failed deserialization * @param exception the actual exception - * @deprecated Since 3.9. Use Please {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} + * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead. */ @Deprecated default DeserializationHandlerResponse handle(final ProcessorContext context, diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java index 25aa00f7a92..939b1ecbcd6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java @@ -59,7 +59,7 @@ public interface ProductionExceptionHandler extends Configurable { * * @param recordthe record that failed to serialize * @param exception the exception that occurred during serialization - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. + * @deprecated Since 3.9. Use {@link #handleSerializationException(ErrorHandlerContext, ProducerRecord, Exception, SerializationExceptionOrigin)} instead. */ @Deprecated default ProductionExceptionHandlerResponse handleSerializationException(final ProducerRecord record, diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java index c907ff3eb89..77500ce3c36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java @@ -81,6 +81,18 @@ public class DefaultErrorHandlerContext implements ErrorHandlerContext { return taskId; } +@Override +public String toString() { +// we do exclude headers on purpose, to not accidentally log user data +return "ErrorHandlerContext{" + +"topic='" + topic + '\'' + +", partition=" + partition + +", offset=" + o
(kafka) branch trunk updated: MINOR: Flaky RestoreIntegrationTest (#16721)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 4c9795eddf6 MINOR: Flaky RestoreIntegrationTest (#16721) 4c9795eddf6 is described below commit 4c9795eddf6ae176dc7ccd1763bb42adc731911e Author: Bill Bejeck AuthorDate: Tue Aug 6 13:05:10 2024 -0400 MINOR: Flaky RestoreIntegrationTest (#16721) Increase some timeouts to work against race condition. Reviewers: Matthias J. Sax --- .../org/apache/kafka/streams/integration/RestoreIntegrationTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index a71bf93980a..bb2e3a6255e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -114,7 +114,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class RestoreIntegrationTest { private static final Logger log = LoggerFactory.getLogger(RestoreIntegrationTest.class); -private static final Duration RESTORATION_DELAY = Duration.ofSeconds(1); +private static final Duration RESTORATION_DELAY = Duration.ofMillis(500); private static final int NUM_BROKERS = 1; @@ -580,7 +580,7 @@ public class RestoreIntegrationTest { validateReceivedMessages(sampleData, outputTopic); // Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state. - kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT)); +kafkaStreams.close(Duration.ofMillis(5000L)); IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations); final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener("ks1", RESTORATION_DELAY);
(kafka) branch trunk updated: KAFKA-16584: Make log processing summary configurable or debug--update upgrade-guide (#16709)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 5596f9e1d56 KAFKA-16584: Make log processing summary configurable or debug--update upgrade-guide (#16709) 5596f9e1d56 is described below commit 5596f9e1d561dfef46a83fbd7264e6745ca538cc Author: dujian0068 <1426703...@qq.com> AuthorDate: Wed Aug 7 03:11:33 2024 +0800 KAFKA-16584: Make log processing summary configurable or debug--update upgrade-guide (#16709) Updates Kafka Streams upgrade-guide for KIP-1049. Reviewers: Bill Bejeck , Matthias J. Sax --- docs/streams/upgrade-guide.html | 6 ++ 1 file changed, 6 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index fb260fdee85..ec0124c7bd6 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -152,6 +152,12 @@ The specified handler must implement the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface. + +Kafka Streams now allows to customize the logging interval of stream-thread runtime summary, via the newly added config log.summary.interval.ms. +By default, the summary is logged every 2 minutes. More details can be found in +https://cwiki.apache.org/confluence/display/KAFKA/KIP-1049%3A+Add+config+log.summary.interval.ms+to+Kafka+Streams";>KIP-1049. + + Streams API changes in 3.8.0
(kafka) branch 3.9 updated: KAFKA-16584: Make log processing summary configurable or debug--update upgrade-guide (#16709)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.9 by this push: new c736d02b522 KAFKA-16584: Make log processing summary configurable or debug--update upgrade-guide (#16709) c736d02b522 is described below commit c736d02b52213332e73404097ab26e172080e30d Author: dujian0068 <1426703...@qq.com> AuthorDate: Wed Aug 7 03:11:33 2024 +0800 KAFKA-16584: Make log processing summary configurable or debug--update upgrade-guide (#16709) Updates Kafka Streams upgrade-guide for KIP-1049. Reviewers: Bill Bejeck , Matthias J. Sax --- docs/streams/upgrade-guide.html | 6 ++ 1 file changed, 6 insertions(+) diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 6102219bf10..6a30e6671b1 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -142,6 +142,12 @@ The specified handler must implement the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface. + +Kafka Streams now allows to customize the logging interval of stream-thread runtime summary, via the newly added config log.summary.interval.ms. +By default, the summary is logged every 2 minutes. More details can be found in +https://cwiki.apache.org/confluence/display/KAFKA/KIP-1049%3A+Add+config+log.summary.interval.ms+to+Kafka+Streams";>KIP-1049. + + Streams API changes in 3.8.0
(kafka) branch trunk updated: KAFKA-12828: Removed Deprecated methods under KeyQueryMetadata (#16747)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 6b5d172dcfa KAFKA-12828: Removed Deprecated methods under KeyQueryMetadata (#16747) 6b5d172dcfa is described below commit 6b5d172dcfa305dcd9c140be13cd65d4c5656761 Author: abhi-ksolves <113602461+abhi-ksol...@users.noreply.github.com> AuthorDate: Wed Aug 7 06:51:23 2024 +0530 KAFKA-12828: Removed Deprecated methods under KeyQueryMetadata (#16747) Reviewers: Matthias J. Sax --- .../org/apache/kafka/streams/KeyQueryMetadata.java | 33 -- 1 file changed, 33 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java index 9f4311922be..7d10309b34f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java @@ -49,39 +49,6 @@ public class KeyQueryMetadata { this.partition = partition; } -/** - * Get the active Kafka Streams instance for given key. - * - * @return active instance's {@link HostInfo} - * @deprecated since 2.7.0; use {@link #activeHost()} instead. - */ -@Deprecated -public HostInfo getActiveHost() { -return activeHost; -} - -/** - * Get the Kafka Streams instances that host the key as standbys. - * - * @return set of standby {@link HostInfo} or a empty set, if no standbys are configured - * @deprecated since 2.7.0; use {@link #standbyHosts()} instead. - */ -@Deprecated -public Set getStandbyHosts() { -return standbyHosts; -} - -/** - * Get the store partition corresponding to the key. - * - * @return store partition number - * @deprecated since 2.7.0; use {@link #partition()} instead. - */ -@Deprecated -public int getPartition() { -return partition; -} - /** * Get the active Kafka Streams instance for given key. *
(kafka) branch trunk updated (0bb2aee8385 -> 9d81a670091)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from 0bb2aee8385 KAFKA-17305; Check broker registrations for missing features (#16848) add 9d81a670091 MINOR: update flaky KafkaStreamsTest (#16756) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/KafkaStreamsTest.java | 92 -- 1 file changed, 49 insertions(+), 43 deletions(-)
(kafka) branch trunk updated: MINOR: remove get prefix for internal Kafka Streams methods (#16722)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new e4cc5d18f45 MINOR: remove get prefix for internal Kafka Streams methods (#16722) e4cc5d18f45 is described below commit e4cc5d18f45d2965e4e1e571adc34bfb80cba642 Author: Matthias J. Sax AuthorDate: Wed Aug 21 14:27:14 2024 -0700 MINOR: remove get prefix for internal Kafka Streams methods (#16722) Reviewers: Lucas Brutschy --- .../streams/examples/wordcount/WordCountDemo.java | 4 +- .../examples/wordcount/WordCountDemoTest.java | 8 ++-- .../org/apache/kafka/streams/KafkaStreams.java | 44 +++--- .../org/apache/kafka/streams/StreamsConfig.java| 4 +- .../org/apache/kafka/streams/TopologyConfig.java | 4 +- .../streams/internals/StreamsConfigUtils.java | 2 +- .../kafka/streams/internals/UpgradeFromValues.java | 2 +- .../kstream/internals/ChangedSerializer.java | 2 +- .../kstream/internals/KTableRepartitionMap.java| 2 +- .../foreignkeyjoin/SubscriptionWrapperSerde.java | 2 +- .../processor/internals/ActiveTaskCreator.java | 10 ++--- .../streams/processor/internals/ClientUtils.java | 11 +++--- .../processor/internals/InternalTopicConfig.java | 2 +- .../processor/internals/InternalTopicManager.java | 8 ++-- .../internals/InternalTopicProperties.java | 2 +- .../internals/InternalTopologyBuilder.java | 6 +-- .../processor/internals/ProcessorContextUtils.java | 4 +- .../internals/RepartitionTopicConfig.java | 2 +- .../processor/internals/StandbyTaskCreator.java| 2 +- .../processor/internals/StoreChangelogReader.java | 2 +- .../streams/processor/internals/StreamThread.java | 30 +++ .../processor/internals/StreamsMetadataState.java | 2 +- .../processor/internals/StreamsProducer.java | 10 ++--- .../processor/internals/TopologyMetadata.java | 12 +++--- .../UnwindowedUnversionedChangelogTopicConfig.java | 2 +- .../internals/VersionedChangelogTopicConfig.java | 2 +- .../internals/WindowedChangelogTopicConfig.java| 2 +- .../assignment/AssignorConfiguration.java | 4 +- .../KafkaStreamsNamedTopologyWrapper.java | 2 +- ...stractDualSchemaRocksDBSegmentedBytesStore.java | 2 +- .../AbstractRocksDBSegmentedBytesStore.java| 2 +- .../state/internals/InMemoryWindowStore.java | 2 +- .../streams/state/internals/KeyValueSegments.java | 2 +- .../state/internals/LogicalKeyValueSegments.java | 2 +- .../streams/state/internals/RocksDBStore.java | 4 +- .../state/internals/RocksDBVersionedStore.java | 4 +- .../state/internals/TimestampedSegments.java | 2 +- .../org/apache/kafka/streams/KafkaStreamsTest.java | 2 +- .../apache/kafka/streams/StreamsConfigTest.java| 10 ++--- .../kstream/internals/KStreamKStreamJoinTest.java | 2 +- .../internals/InternalTopicConfigTest.java | 20 +- .../internals/InternalTopicManagerTest.java| 2 +- .../internals/InternalTopologyBuilderTest.java | 10 ++--- .../processor/internals/StreamThreadTest.java | 24 ++-- .../metrics/RocksDBBlockCacheMetricsTest.java | 2 +- 45 files changed, 140 insertions(+), 141 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java index 1664445f098..a9f2be7f64c 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -49,7 +49,7 @@ public final class WordCountDemo { public static final String INPUT_TOPIC = "streams-plaintext-input"; public static final String OUTPUT_TOPIC = "streams-wordcount-output"; -static Properties getStreamsConfig(final String[] args) throws IOException { +static Properties streamsConfig(final String[] args) throws IOException { final Properties props = new Properties(); if (args != null && args.length > 0) { try (final FileInputStream fis = new FileInputStream(args[0])) { @@ -85,7 +85,7 @@ public final class WordCountDemo { } public static void main(final String[] args) throws IOException { -final Properties props = getStreamsConfig(args); +final Properties props = streamsConfig(args); final StreamsBuilder builder = new StreamsBuilder(); createWordCountStream(builder); diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountDemoTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCount
(kafka) branch trunk updated: MINOR: Cleanup Joined class (#14551)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 1e14b0c9640 MINOR: Cleanup Joined class (#14551) 1e14b0c9640 is described below commit 1e14b0c964091121a5116cae18d9494a5d07bb5d Author: Matthias J. Sax AuthorDate: Wed Aug 21 14:31:22 2024 -0700 MINOR: Cleanup Joined class (#14551) Code cleanup and JavaDocs fixed, plus add missing getters to JoinedInternal. Reviewers: Lucas Brutschy --- .../org/apache/kafka/streams/kstream/Joined.java | 222 - .../streams/kstream/internals/JoinedInternal.java | 18 +- .../streams/kstream/internals/KStreamImpl.java | 29 ++- 3 files changed, 153 insertions(+), 116 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java index 55dff2428f8..2978f943f31 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java @@ -24,47 +24,56 @@ import java.time.Duration; * The {@code Joined} class represents optional params that can be passed to * {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} and * {@link KStream#leftJoin(KTable, ValueJoiner) KStream#leftJoin(KTable,...)} operations. + * + * @param type of record key + * @param type of left record value + * @param type of right record value */ -public class Joined implements NamedOperation> { +public class Joined implements NamedOperation> { protected final Serde keySerde; -protected final Serde valueSerde; -protected final Serde otherValueSerde; +protected final Serde leftValueSerde; +protected final Serde rightValueSerde; protected final String name; protected final Duration gracePeriod; private Joined(final Serde keySerde, - final Serde valueSerde, - final Serde otherValueSerde, + final Serde leftValueSerde, + final Serde rightValueSerde, final String name, final Duration gracePeriod) { this.keySerde = keySerde; -this.valueSerde = valueSerde; -this.otherValueSerde = otherValueSerde; +this.leftValueSerde = leftValueSerde; +this.rightValueSerde = rightValueSerde; this.name = name; this.gracePeriod = gracePeriod; } -protected Joined(final Joined joined) { -this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name, joined.gracePeriod); +protected Joined(final Joined joined) { +this(joined.keySerde, joined.leftValueSerde, joined.rightValueSerde, joined.name, joined.gracePeriod); } /** * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances. * {@code null} values are accepted and will be replaced by the default serdes as defined in config. * - * @param keySerdethe key serde to use. If {@code null} the default key serde from config will be used - * @param valueSerde the value serde to use. If {@code null} the default value serde from config will be used - * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde from config will be used - * @param key type - * @param value type - * @param other value type + * @param keySerde + *the key serde to use. If {@code null} the default key serde from config will be used + * @param leftValueSerde + *the value serde to use. If {@code null} the default value serde from config will be used + * @param rightValueSerde + *the otherValue serde to use. If {@code null} the default value serde from config will be used + * + * @param key type + * @param left value type + * @param right value type + * * @return new {@code Joined} instance with the provided serdes */ -public static Joined with(final Serde keySerde, - final Serde valueSerde, - final Serde otherValueSerde) { -return new Joined<>(keySerde, valueSerde, otherValueSerde, null, null); +public static Joined with(final Serde keySerde, + final Serde leftValueSerde, + final Serde rightValueSerde) { +return new Joined<>(keySerde, leftValueSerde, rightValueSerde, null, null); } /** @@ -72,24 +81,26 @@ public class Joined implements NamedOperation> { * {@
(kafka) branch trunk updated (a3aa6372ea1 -> a2f89f5412f)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git from a3aa6372ea1 KAFKA-17336 Add IT to make sure the production MV does not use unstable version of LIST_OFFSET (#16893) add a2f89f5412f KAFKA-16333: remove deprecated join method with named param (#16764) No new revisions were added by this update. Summary of changes: .../org/apache/kafka/streams/kstream/KTable.java | 98 -- .../streams/kstream/internals/KTableImpl.java | 65 -- .../kafka/streams/scala/kstream/KTable.scala | 46 -- 3 files changed, 209 deletions(-)
(kafka) branch 3.9 updated: MINOR: fix HTML for topology.optimization config (#16953)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.9 by this push: new 1f6d5aec829 MINOR: fix HTML for topology.optimization config (#16953) 1f6d5aec829 is described below commit 1f6d5aec82908177c1d66eb2920d2644d00f7df8 Author: Matthias J. Sax AuthorDate: Thu Aug 22 00:24:16 2024 -0700 MINOR: fix HTML for topology.optimization config (#16953) The HTML rendering broke via https://issues.apache.org/jira/browse/KAFKA-14209 in 3.4 release. The currently shown value is some garbage org.apache.kafka.streams.StreamsConfig$$Lambda$20/0x000800c0cf18@b1bc7ed cf https://kafka.apache.org/documentation/#streamsconfigs_topology.optimization Verified the fix via running StreamsConfig#main() locally. Reviewers: Bill Bejeck , Chia-Ping Tsai --- .../main/java/org/apache/kafka/streams/StreamsConfig.java| 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d1ffd6a5878..61cb804b465 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1035,7 +1035,17 @@ public class StreamsConfig extends AbstractConfig { .define(TOPOLOGY_OPTIMIZATION_CONFIG, Type.STRING, NO_OPTIMIZATION, -(name, value) -> verifyTopologyOptimizationConfigs((String) value), +new ConfigDef.Validator() { +@Override +public void ensureValid(final String name, final Object value) { +verifyTopologyOptimizationConfigs((String) value); +} + +@Override +public String toString() { +return TOPOLOGY_OPTIMIZATION_CONFIGS.toString(); +} +}, Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC)
(kafka-site) branch minor-topology-optimization-config created (now 51a60997f)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch minor-topology-optimization-config in repository https://gitbox.apache.org/repos/asf/kafka-site.git at 51a60997f MINOR: fix config topology.optimization No new revisions were added by this update.
(kafka) branch 3.8 updated: MINOR: fix HTML for topology.optimization config (#16953)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.8 by this push: new ae731bc8619 MINOR: fix HTML for topology.optimization config (#16953) ae731bc8619 is described below commit ae731bc8619a237be297ae16a402e416dc1151b3 Author: Matthias J. Sax AuthorDate: Thu Aug 22 00:24:16 2024 -0700 MINOR: fix HTML for topology.optimization config (#16953) The HTML rendering broke via https://issues.apache.org/jira/browse/KAFKA-14209 in 3.4 release. The currently shown value is some garbage org.apache.kafka.streams.StreamsConfig$$Lambda$20/0x000800c0cf18@b1bc7ed cf https://kafka.apache.org/documentation/#streamsconfigs_topology.optimization Verified the fix via running StreamsConfig#main() locally. Reviewers: Bill Bejeck , Chia-Ping Tsai --- .../main/java/org/apache/kafka/streams/StreamsConfig.java| 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 502eab8eb87..fcc31e0e190 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1010,7 +1010,17 @@ public class StreamsConfig extends AbstractConfig { .define(TOPOLOGY_OPTIMIZATION_CONFIG, Type.STRING, NO_OPTIMIZATION, -(name, value) -> verifyTopologyOptimizationConfigs((String) value), +new ConfigDef.Validator() { +@Override +public void ensureValid(final String name, final Object value) { +verifyTopologyOptimizationConfigs((String) value); +} + +@Override +public String toString() { +return TOPOLOGY_OPTIMIZATION_CONFIGS.toString(); +} +}, Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC)
(kafka) branch 3.7 updated: MINOR: fix HTML for topology.optimization config (#16953)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.7 by this push: new 89be50f5827 MINOR: fix HTML for topology.optimization config (#16953) 89be50f5827 is described below commit 89be50f58279ebbe3f5c57381835036c3b5be80a Author: Matthias J. Sax AuthorDate: Thu Aug 22 00:24:16 2024 -0700 MINOR: fix HTML for topology.optimization config (#16953) The HTML rendering broke via https://issues.apache.org/jira/browse/KAFKA-14209 in 3.4 release. The currently shown value is some garbage org.apache.kafka.streams.StreamsConfig$$Lambda$20/0x000800c0cf18@b1bc7ed cf https://kafka.apache.org/documentation/#streamsconfigs_topology.optimization Verified the fix via running StreamsConfig#main() locally. Reviewers: Bill Bejeck , Chia-Ping Tsai --- .../main/java/org/apache/kafka/streams/StreamsConfig.java| 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 14f74333e97..757e0dc857a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -988,7 +988,17 @@ public class StreamsConfig extends AbstractConfig { .define(TOPOLOGY_OPTIMIZATION_CONFIG, Type.STRING, NO_OPTIMIZATION, -(name, value) -> verifyTopologyOptimizationConfigs((String) value), +new ConfigDef.Validator() { +@Override +public void ensureValid(final String name, final Object value) { +verifyTopologyOptimizationConfigs((String) value); +} + +@Override +public String toString() { +return TOPOLOGY_OPTIMIZATION_CONFIGS.toString(); +} +}, Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC)
(kafka) branch 3.6 updated: MINOR: fix HTML for topology.optimization config (#16953)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.6 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.6 by this push: new c888ab9c612 MINOR: fix HTML for topology.optimization config (#16953) c888ab9c612 is described below commit c888ab9c612ee492c0bb31495efefff0aba6e2ea Author: Matthias J. Sax AuthorDate: Thu Aug 22 00:24:16 2024 -0700 MINOR: fix HTML for topology.optimization config (#16953) The HTML rendering broke via https://issues.apache.org/jira/browse/KAFKA-14209 in 3.4 release. The currently shown value is some garbage org.apache.kafka.streams.StreamsConfig$$Lambda$20/0x000800c0cf18@b1bc7ed cf https://kafka.apache.org/documentation/#streamsconfigs_topology.optimization Verified the fix via running StreamsConfig#main() locally. Reviewers: Bill Bejeck , Chia-Ping Tsai --- .../main/java/org/apache/kafka/streams/StreamsConfig.java| 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 1cd80b2093d..edf807eb723 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -965,7 +965,17 @@ public class StreamsConfig extends AbstractConfig { .define(TOPOLOGY_OPTIMIZATION_CONFIG, Type.STRING, NO_OPTIMIZATION, -(name, value) -> verifyTopologyOptimizationConfigs((String) value), +new ConfigDef.Validator() { +@Override +public void ensureValid(final String name, final Object value) { +verifyTopologyOptimizationConfigs((String) value); +} + +@Override +public String toString() { +return TOPOLOGY_OPTIMIZATION_CONFIGS.toString(); +} +}, Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC)
(kafka-site) branch asf-site updated: MINOR: fix config topology.optimization (#626)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/kafka-site.git The following commit(s) were added to refs/heads/asf-site by this push: new c21515225 MINOR: fix config topology.optimization (#626) c21515225 is described below commit c215152258a4d646d508fece77bc6bc357b0dd29 Author: Matthias J. Sax AuthorDate: Thu Aug 22 18:34:38 2024 -0700 MINOR: fix config topology.optimization (#626) Reviewers: Bill Bejeck --- 34/generated/streams_config.html | 2 +- 35/generated/streams_config.html | 2 +- 36/generated/streams_config.html | 2 +- 37/generated/streams_config.html | 2 +- 38/generated/streams_config.html | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/34/generated/streams_config.html b/34/generated/streams_config.html index d3d80be5e..39ffb73a7 100644 --- a/34/generated/streams_config.html +++ b/34/generated/streams_config.html @@ -255,7 +255,7 @@ Type:string Default:none -Valid Values:org.apache.kafka.streams.StreamsConfig$$Lambda$3/521645586@49c2faae +Valid Values:[all, none, reuse.ktable.source.topics, merge.repartition.topics, single.store.self.join] Importance:medium diff --git a/35/generated/streams_config.html b/35/generated/streams_config.html index 02c9ef812..c10ef409e 100644 --- a/35/generated/streams_config.html +++ b/35/generated/streams_config.html @@ -255,7 +255,7 @@ Type:string Default:none -Valid Values:org.apache.kafka.streams.StreamsConfig$$Lambda$21/0x000800c0c208@b1bc7ed +Valid Values:[all, none, reuse.ktable.source.topics, merge.repartition.topics, single.store.self.join] Importance:medium diff --git a/36/generated/streams_config.html b/36/generated/streams_config.html index 92674cd59..ebecc868d 100644 --- a/36/generated/streams_config.html +++ b/36/generated/streams_config.html @@ -285,7 +285,7 @@ Type:string Default:none -Valid Values:org.apache.kafka.streams.StreamsConfig$$Lambda$8/835648992@1a86f2f1 +Valid Values:[all, none, reuse.ktable.source.topics, merge.repartition.topics, single.store.self.join] Importance:medium diff --git a/37/generated/streams_config.html b/37/generated/streams_config.html index dc9297fdd..89f31685a 100644 --- a/37/generated/streams_config.html +++ b/37/generated/streams_config.html @@ -285,7 +285,7 @@ Type:string Default:none -Valid Values:org.apache.kafka.streams.StreamsConfig$$Lambda$22/0x00030100d828@3d494fbf +Valid Values:[all, none, reuse.ktable.source.topics, merge.repartition.topics, single.store.self.join] Importance:medium diff --git a/38/generated/streams_config.html b/38/generated/streams_config.html index 9f4a1046f..0b032f0bb 100644 --- a/38/generated/streams_config.html +++ b/38/generated/streams_config.html @@ -295,7 +295,7 @@ Type:string Default:none -Valid Values:org.apache.kafka.streams.StreamsConfig$$Lambda$20/0x000800c0cf18@b1bc7ed +Valid Values:[all, none, reuse.ktable.source.topics, merge.repartition.topics, single.store.self.join] Importance:medium
(kafka-site) branch minor-topology-optimization-config deleted (was 51a60997f)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a change to branch minor-topology-optimization-config in repository https://gitbox.apache.org/repos/asf/kafka-site.git was 51a60997f MINOR: fix config topology.optimization The revisions that were on this branch are still contained in other references; therefore, this change does not discard any commits from the repository.
(kafka) branch trunk updated: KAFKA-17371: Fix DefaultTaskExecutorTest.shouldUnassignTaskWhenRequired (#16941)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 46597638eee KAFKA-17371: Fix DefaultTaskExecutorTest.shouldUnassignTaskWhenRequired (#16941) 46597638eee is described below commit 4659763815d2d572d79cba9645e467a16d98 Author: TengYao Chi AuthorDate: Fri Aug 23 09:44:58 2024 +0800 KAFKA-17371: Fix DefaultTaskExecutorTest.shouldUnassignTaskWhenRequired (#16941) Reviewers: Matthias J. Sax --- .../streams/processor/internals/tasks/DefaultTaskExecutorTest.java | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java index 7ee1f247934..a538cdf66d0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -35,7 +36,6 @@ import java.util.Collections; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -230,7 +230,9 @@ public class DefaultTaskExecutorTest { taskExecutor.start(); verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor); -assertNotNull(taskExecutor.currentTask()); +TestUtils.waitForCondition(() -> taskExecutor.currentTask() != null, +VERIFICATION_TIMEOUT, +"Task reassign take too much time"); final KafkaFuture future = taskExecutor.unassign();
(kafka) branch trunk updated: KAFKA-16334: Remove --bootstrap-servers from kafka-streams-application-reset
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new c7f0ade7e06 KAFKA-16334: Remove --bootstrap-servers from kafka-streams-application-reset c7f0ade7e06 is described below commit c7f0ade7e06598d36bf5572dc76f0bdb3228c46a Author: Caio Guedes AuthorDate: Fri Aug 23 03:55:25 2024 +0200 KAFKA-16334: Remove --bootstrap-servers from kafka-streams-application-reset Reviewers: Matthias J. Sax , Chia-Ping Tsai --- .../java/org/apache/kafka/tools/StreamsResetter.java| 17 + 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index 5af08cd0631..a4894e22875 100644 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -136,8 +136,6 @@ public class StreamsResetter { String bootstrapServerValue = "localhost:9092"; if (options.hasBootstrapServer()) { bootstrapServerValue = options.bootstrapServer(); -} else if (options.hasBootstrapServers()) { -bootstrapServerValue = options.bootstrapServers(); } properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerValue); @@ -546,7 +544,6 @@ public class StreamsResetter { } private static class StreamsResetterOptions extends CommandDefaultOptions { -private final OptionSpec bootstrapServersOption; private final OptionSpec bootstrapServerOption; private final OptionSpec applicationIdOption; private final OptionSpec inputTopicsOption; @@ -570,11 +567,7 @@ public class StreamsResetter { .ofType(String.class) .describedAs("id") .required(); -bootstrapServersOption = parser.accepts("bootstrap-servers", "DEPRECATED: Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2") -.withRequiredArg() -.ofType(String.class) -.describedAs("urls"); -bootstrapServerOption = parser.accepts("bootstrap-server", "REQUIRED unless --bootstrap-servers(deprecated) is specified. The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. (default: localhost:9092)") +bootstrapServerOption = parser.accepts("bootstrap-server", "The server(s) to connect to. The broker list string in the form HOST1:PORT1,HOST2:PORT2. (default: localhost:9092)") .withRequiredArg() .ofType(String.class) .describedAs("server to connect to"); @@ -668,14 +661,6 @@ public class StreamsResetter { return options.valueOf(bootstrapServerOption); } -public boolean hasBootstrapServers() { -return options.has(bootstrapServersOption); -} - -public String bootstrapServers() { -return options.valueOf(bootstrapServersOption); -} - public boolean hasForce() { return options.has(forceOption); }
(kafka) branch 3.5 updated: MINOR: fix HTML for topology.optimization config (#16953)
This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.5 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/3.5 by this push: new 1361d5cd5bc MINOR: fix HTML for topology.optimization config (#16953) 1361d5cd5bc is described below commit 1361d5cd5bcb620367a67f2151b257657cc6f907 Author: Matthias J. Sax AuthorDate: Thu Aug 22 00:24:16 2024 -0700 MINOR: fix HTML for topology.optimization config (#16953) The HTML rendering broke via https://issues.apache.org/jira/browse/KAFKA-14209 in 3.4 release. The currently shown value is some garbage org.apache.kafka.streams.StreamsConfig$$Lambda$20/0x000800c0cf18@b1bc7ed cf https://kafka.apache.org/documentation/#streamsconfigs_topology.optimization Verified the fix via running StreamsConfig#main() locally. Reviewers: Bill Bejeck , Chia-Ping Tsai --- .../main/java/org/apache/kafka/streams/StreamsConfig.java| 12 +++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index fdd549d4c42..707fbea6052 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -918,7 +918,17 @@ public class StreamsConfig extends AbstractConfig { .define(TOPOLOGY_OPTIMIZATION_CONFIG, Type.STRING, NO_OPTIMIZATION, -(name, value) -> verifyTopologyOptimizationConfigs((String) value), +new ConfigDef.Validator() { +@Override +public void ensureValid(final String name, final Object value) { +verifyTopologyOptimizationConfigs((String) value); +} + +@Override +public String toString() { +return TOPOLOGY_OPTIMIZATION_CONFIGS.toString(); +} +}, Importance.MEDIUM, TOPOLOGY_OPTIMIZATION_DOC)