fqaiser94 commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1190413444
########## tests/kafkatest/tests/streams/streams_upgrade_test.py: ########## @@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, from_version, to_version): self.stop_and_await() + @cluster(num_nodes=6) + @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)]) + def test_rolling_upgrade_for_table_agg(self, from_version, to_version): + """ + This test verifies that the cluster successfully upgrades despite changes in the table + repartition topic format. + + Starts 3 KafkaStreams instances with version <from_version> and upgrades one-by-one to <to_version> + """ + + extra_properties = {'test.run_table_agg': 'true'} + + self.set_up_services() + + self.driver.start() + + # encoding different target values for different versions + # - old version: value=A + # - new version with `upgrade_from` flag set: value=B + # - new version w/o `upgrade_from` set set: value=C + + extra_properties = extra_properties.copy() + extra_properties['test.agg_produce_value'] = 'A' + extra_properties['test.expected_agg_values'] = 'A' + self.start_all_nodes_with(from_version, extra_properties) + + counter = 1 + random.seed() + + # rolling bounce + random.shuffle(self.processors) + p3 = self.processors[-1] + for p in self.processors: + p.CLEAN_NODE_ENABLED = False + + # bounce two instances to new version (verifies that new version can process records + # written by old version) + extra_properties = extra_properties.copy() + extra_properties['test.agg_produce_value'] = 'B' + extra_properties['test.expected_agg_values'] = 'A,B' + for p in self.processors[:-1]: + self.do_stop_start_bounce(p, from_version[:-2], to_version, counter, extra_properties) + counter = counter + 1 + Review Comment: ```suggestion # verify that old version can process records from new version self.wait_for_table_agg_success('A,B') ``` Can we assert this condition earlier, like here? I don't think we need to bounce the remaining instance before asserting this condition. Here's how I'm thinking about it: ![image](https://github.com/apache/kafka/assets/20507243/7277a4df-0c93-4a42-8f7c-4d7adb02f6a6) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org