cadonna commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183390548


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -106,7 +106,11 @@ private static class ValueList {
         }
 
         int next() {
-            return (index < values.length) ? values[index++] : -1;
+            final int v = values[index++];
+            if (index >= values.length) {
+                index = 0;
+            }

Review Comment:
   Doesn't this risk to bring a lot of disorder into the timestamps? I am 
referring to the comment on line 100. What are the consequences of such a 
disorder? 



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite 
changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and 
upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can 
process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)

Review Comment:
   I do not understand `from_version[:-2]` here. Doesn't this return a sublist? 



##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##########
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
     final static int END = Integer.MAX_VALUE;
 
+    static ProcessorSupplier<Object, Object, Void, Void> 
printTaskProcessorSupplier(final String topic) {
+        return printTaskProcessorSupplier(topic, "");
+    }
+
     static ProcessorSupplier<Object, Object, Void, Void> 
printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, "");
     }
 
+    static ProcessorSupplier<Object, Object, Void, Void> 
printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   It seems the parameter `name` is not used.



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite 
changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and 
upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can 
process records
+        # written by old version)

Review Comment:
   Do we have any guarantee that the instance on the new version do actually 
read any records from the repartition topic?
   Do we need to insert some sleeps to ensure that enough records are written 
to the repartition topics?
   Do we need to increase the commit interval to ensure that records are not 
purged from the repartition topic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to