mjsax commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1182064875


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -126,7 +130,7 @@ static void generatePerpetually(final String kafka,
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
         }
 
-        final Random rand = new Random();
+        final Random rand = new Random(System.currentTimeMillis());

Review Comment:
   Minor side improvement



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, 
Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> 
sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),

Review Comment:
   Changed this to use `v` as key -- works just fine



##########
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception 
{
         final boolean runFkJoin = 
Boolean.parseBoolean(streamsProperties.getProperty(
             "test.run_fk_join",
             "false"));
+        final boolean runTableAgg = 
Boolean.parseBoolean(streamsProperties.getProperty(

Review Comment:
   Backported the table-aggregation step to older versions -- without it, the 
first app instances we start up don't have it.
   
   This must be done for other older versions we want to test, too.



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), 
                     str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   We should add more versions here -- not sure how far back we want to go?



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, 
Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> 
sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);
+                        updated = true;
+                    } else {
+                        seenValues = agg.seenValues;
+                        updated = false;
+                    }
+
+                    final boolean shouldLog = updated || (agg.recordsProcessed 
% 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 
100 once test is passing.
+                    if (shouldLog) {

Review Comment:
   Changed this slightly to avoid spamming the output



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -175,7 +176,7 @@ def test_upgrade_downgrade_brokers(self, from_version, 
to_version):
                     self.perform_broker_upgrade(to_version)
 
                     log_monitor.wait_until(connected_message,
-                                           timeout_sec=120,
+                                           timeout_sec=60,

Review Comment:
   Not sure why this timeout was higher than all others. Side cleanup



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -236,6 +237,96 @@ def test_rolling_upgrade_with_2_bounces(self, 
from_version, to_version):
 
         self.stop_and_await()
 
+    @cluster(num_nodes=6)
+    @matrix(from_version=table_agg_versions, to_version=[str(DEV_VERSION)])
+    def test_rolling_upgrade_for_table_agg(self, from_version, to_version):
+        """
+        This test verifies that the cluster successfully upgrades despite 
changes in the table
+        repartition topic format.
+
+        Starts 3 KafkaStreams instances with version <from_version> and 
upgrades one-by-one to <to_version>
+        """
+
+        extra_properties = {'test.run_table_agg': 'true'}
+
+        self.set_up_services()
+
+        self.driver.start()
+
+        # encoding different target values for different versions
+        #  - old version: value=A
+        #  - new version with `upgrade_from` flag set: value=B
+        #  - new version w/o `upgrade_from` set set: value=C
+
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A'
+        self.start_all_nodes_with(from_version, extra_properties)
+
+        counter = 1
+        random.seed()
+
+        # rolling bounce
+        random.shuffle(self.processors)
+        p3 = self.processors[-1]
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+
+        # bounce two instances to new version (verifies that new version can 
process records
+        # written by old version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+            counter = counter + 1
+
+        # bounce remaining instance on old version (just for verification 
purposes, to verify that
+        # instance on old version can process records written by new version)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'A'
+        extra_properties['test.expected_agg_values'] = 'A,B'
+        self.do_stop_start_bounce(p3, None, from_version, counter, 
extra_properties)
+        counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B')
+
+        # bounce remaining instance to new version (verifies that new version 
without upgrade_from
+        # can process records written by new version with upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'C'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        self.do_stop_start_bounce(p3, None, to_version, counter, 
extra_properties)
+        counter = counter + 1
+
+        # bounce first instances again without removing upgrade_from (just for 
verification purposes,
+        # to verify that instance with upgrade_from can process records 
written without upgrade_from)
+        extra_properties = extra_properties.copy()
+        extra_properties['test.agg_produce_value'] = 'B'
+        extra_properties['test.expected_agg_values'] = 'A,B,C'
+        for p in self.processors[:-1]:
+            self.do_stop_start_bounce(p, from_version[:-2], to_version, 
counter, extra_properties)
+            counter = counter + 1
+
+        self.wait_for_table_agg_success('A,B,C')
+
+        self.stop_and_await()
+
+    def wait_for_table_agg_success(self, expected_values):
+        agg_success_str = "Table aggregate processor saw expected values. 
Seen: " + expected_values
+        with 
self.processor1.node.account.monitor_log(self.processor1.STDOUT_FILE) as 
first_monitor:
+            with 
self.processor2.node.account.monitor_log(self.processor2.STDOUT_FILE) as 
second_monitor:
+                with 
self.processor3.node.account.monitor_log(self.processor3.STDOUT_FILE) as 
third_monitor:

Review Comment:
   All three monitors need to tail STDOUT (not the LOG files)



##########
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -69,14 +83,32 @@ public static void main(final String[] args) throws 
Exception {
                 System.err.println("Caught " + e.getMessage());
             }
         }
+        if (runTableAgg) {
+            final String aggProduceValue = 
streamsProperties.getProperty("test.agg_produce_value", "");
+            if (aggProduceValue.isEmpty()) {
+                System.err.printf("'%s' must be specified when '%s' is true.", 
"test.agg_produce_value", "test.run_table_agg");
+            }
+            final String expectedAggValuesStr = 
streamsProperties.getProperty("test.expected_agg_values", "");
+            if (expectedAggValuesStr.isEmpty()) {
+                System.err.printf("'%s' must be specified when '%s' is true.", 
"test.expected_agg_values", "test.run_table_agg");
+            }
+            final List<String> expectedAggValues = 
Arrays.asList(expectedAggValuesStr.split(","));
+
+            try {
+                buildTableAgg(dataTable, aggProduceValue, expectedAggValues);
+            } catch (final Exception e) {
+                System.err.println("Caught " + e.getMessage());
+            }
+        }
 
         final Properties config = new Properties();
         config.setProperty(
             StreamsConfig.APPLICATION_ID_CONFIG,
-            "StreamsUpgradeTest-" + new Random().nextLong());
+            "StreamsUpgradeTest");

Review Comment:
   Fix application.id (cf https://github.com/apache/kafka/pull/13654)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to