fqaiser94 commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1190418116


##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite 
changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and 
upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can 
process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification 
purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')

Review Comment:
   ```suggestion
           # bounce remaining instance on old version to produce a new unique 
value
           extra_properties = extra_properties.copy()
           extra_properties['test.agg_produce_value'] = 'C'
           extra_properties['test.expected_agg_values'] = 'A,B,C'
           self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
           counter = counter + 1
           
           # verify that new version can process records from old version
           self.wait_for_table_agg_success('A,B,C')
   ```
   
   I think it might be better to have this old (not-upgraded) instance start 
producing a new value `'C'` when we bounce it here. That way, we can assert 
using `self.wait_for_table_agg_success('A,B,C')` and be sure that the two 
upgraded instances have successfully processed messages from the old 
(not-upgraded) instance as well. 
   
   (Note, if you accept this change, you will need to make changes below here 
to produce new unique values like D,E,F, etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to