[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
cadonna commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) Review Comment: I see the check, but it does not guarantee that the instance that was not rolled had enough time to write records in the old format to partitions that will be read by the rolled instances. Also it does not guarantee that the records that might have been written by the not rolled instance to partitions that will be read by the rolled instances have not been already consumed by the not rolled instance itself before the rolled instances start processing. Similar is true for the subsequent rolls. Does this make sense or do I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
cadonna commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1187176080 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) Review Comment: I see the check, but it does not guarantee that the instance that was not rolled had enough time to write records in the old format to partitions that will be read by the rolled instances. Also it does not guarantee that the records that might have been written by the not rolled instance to partitions that will be read by the rolled instances has not been already consumed by the not rolled instance itself before the rolled instances start processing. Similar is true for the subsequent rolls. Does this make sense or do I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
cadonna commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1187159604 ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'B' +extra_properties['test.expected_agg_values'] = 'A,B' +for p in self.processors[:-1]: +self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) Review Comment: Ah, I misinterpreted the code. I thought, the whole list of from_versions is passed into the function. Now I see that it is just one version, obviously. My fault, sorry! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13656: KAFKA-14911: Add system tests for rolling upgrade path of KIP-904
cadonna commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183390548 ## streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java: ## @@ -106,7 +106,11 @@ private static class ValueList { } int next() { -return (index < values.length) ? values[index++] : -1; +final int v = values[index++]; +if (index >= values.length) { +index = 0; +} Review Comment: Doesn't this risk to bring a lot of disorder into the timestamps? I am referring to the comment on line 100. What are the consequences of such a disorder? ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'B' +extra_properties['test.expected_agg_values'] = 'A,B' +for p in self.processors[:-1]: +self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) Review Comment: I do not understand `from_version[:-2]` here. Doesn't this return a sublist? ## streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java: ## @@ -34,10 +34,34 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; +static ProcessorSupplier printTaskProcessorSupplier(final String topic) { +return printTaskProcessorSupplier(topic, ""); +} + static ProcessorSupplier printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } +static ProcessorSupplier printTaskProcessorSupplier(final String topic, final String name) { Review Comment: It seems the parameter `name` is not used. ## tests/kafkatest/tests/streams/streams_upgrade_test.py: ## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() +@cluster(num_nodes=6) +@matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) +def test_rolling_upgrade_for_table_agg(self, from_version, to_version): +""" +This test verifies that the cluster successfully upgrades despite changes in the table +repartition topic format. + +Starts 3 KafkaStreams instances with version and upgrades one-by-one to +""" + +extra_properties = {'test.run_table_agg': 'true'} + +self.set_up_services() + +self.driver.start() + +# encoding different target values for different versions +# - old version: value=A +# - new version with `upgrade_from` flag set: value=B +# - new version w/o `upgrade_from` set set: value=C + +extra_properties = extra_properties.copy() +extra_properties['test.agg_produce_value'] = 'A' +extra_properties['test.expected_agg_values'] = 'A' +self.start_all_nodes_with(from_version, extra_properties) + +counter = 1 +random.seed() + +# rolling bounce +random.shuffle(self.processors) +p3 = self.processors[-1] +for p in self.processors: +p.CLEAN_NODE_ENABLED = False + +# bounce two instances to new version (verifies that new version can process records +# written by old version) Review Comment: Do we have any guarantee that the instance on the new version do actually