cadonna commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r663088560



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -128,45 +128,61 @@ def run_failure_and_recovery(self, processor1, 
processor2, processor3, verifier)
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % 
verifier.STDOUT_FILE, allow_fail=False)
 
     def add_streams(self, processor):
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
-            processor.start()
-            self.wait_for_startup(monitor, processor)
+        with processor.node.account.monitor_log(processor.LOG_FILE) as 
log_monitor:
+            with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
stdout_monitor:
+                processor.start()
+                self.wait_for_running(stdout_monitor, processor)
+                self.wait_for_commit(log_monitor, processor)
 
     def add_streams2(self, running_processor, processor_to_be_started):
-        with 
running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as 
monitor:
-            self.add_streams(processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor)
+        with 
running_processor.node.account.monitor_log(running_processor.LOG_FILE) as 
log_monitor:
+            with 
running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as 
stdout_monitor:
+                self.add_streams(processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor)
+                self.wait_for_commit(log_monitor, running_processor)
 
     def add_streams3(self, running_processor1, running_processor2, 
processor_to_be_started):
-        with 
running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as 
monitor:
-            self.add_streams2(running_processor2, processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor1)
+        with 
running_processor1.node.account.monitor_log(running_processor1.LOG_FILE) as 
log_monitor:
+            with 
running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as 
stdout_monitor:
+                self.add_streams2(running_processor2, processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor1)
+                self.wait_for_commit(log_monitor, running_processor1)
 
     def stop_streams(self, processor_to_be_stopped):
         with 
processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE)
 as monitor2:
             processor_to_be_stopped.stop()
             self.wait_for(monitor2, processor_to_be_stopped, "StateChange: 
PENDING_SHUTDOWN -> NOT_RUNNING")
 
     def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
-        with 
keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) 
as monitor:
-            self.stop_streams(processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor)
+        with 
keep_alive_processor.node.account.monitor_log(keep_alive_processor.LOG_FILE) as 
log_monitor:
+            with 
keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) 
as stdout_monitor:
+                self.stop_streams(processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor)
+                self.wait_for_commit(log_monitor, keep_alive_processor)
 
     def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_stopped):
-        with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor:
-            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor1)
+        with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) 
as log_monitor:
+            with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as stdout_monitor:
+                self.stop_streams2(keep_alive_processor2, 
processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor1)
+                self.wait_for_commit(log_monitor, keep_alive_processor1)
 
     def abort_streams(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_aborted):
-        with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor1:
-            with 
keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE)
 as monitor2:
-                processor_to_be_aborted.stop_nodes(False)
-            self.wait_for_startup(monitor2, keep_alive_processor2)
-        self.wait_for_startup(monitor1, keep_alive_processor1)
-
-    def wait_for_startup(self, monitor, processor):
+        with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) 
as log_monitor1:
+            with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as stdout_monitor1:
+                with 
keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.LOG_FILE) 
as log_monitor2:
+                    with 
keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as stdout_monitor2:
+                        processor_to_be_aborted.stop_nodes(False)
+                        self.wait_for_running(stdout_monitor2, 
keep_alive_processor2)
+                        self.wait_for_running(stdout_monitor1, 
keep_alive_processor1)
+                        self.wait_for_commit(log_monitor2, 
keep_alive_processor2)
+                        self.wait_for_commit(log_monitor1, 
keep_alive_processor1)
+
+    def wait_for_running(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> 
RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from 
topic")
+
+    def wait_for_commit(self, monitor, processor):
+        self.wait_for(monitor, processor, "Committed all active tasks 
\[[0-9_,]+\] and standby tasks \[[0-9_,]+\]")

Review comment:
       @mjsax I added this verification to ensure that the instance started to 
process data. It is a bit more complicate and  a bit more brittle because it 
relies on DEBUG logs, but it works for active and standby tasks. WDYT?
   If you think it is OK, I will start testing the system test locally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to