C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1360904810
########## tests/kafkatest/tests/connect/connect_distributed_test.py: ########## @@ -375,6 +381,159 @@ def test_pause_state_persistent(self, exactly_once_source, connect_protocol, met wait_until(lambda: self.is_paused(self.source, node), timeout_sec=120, err_msg="Failed to see connector startup in PAUSED state") + @cluster(num_nodes=5) + def test_dynamic_logging(self): + """ + Test out the REST API for dynamically adjusting logging levels, on both a single-worker and cluster-wide basis. + """ + + self.setup_services(num_workers=3) + self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) + self.cc.start() + + worker = self.cc.nodes[0] + prior_all_loggers = self.cc.get_all_loggers(worker) + self.logger.debug("Listed all loggers via REST API: %s", str(prior_all_loggers)) + assert prior_all_loggers is not None + assert 'root' in prior_all_loggers + # We need root and at least one other namespace (the other namespace is checked + # later on to make sure that it hasn't changed) + assert len(prior_all_loggers) >= 2 + for logger in prior_all_loggers.values(): + assert logger['last_modified'] is None + + namespace = None + for logger in prior_all_loggers.keys(): + if logger != 'root': + namespace = logger + break + assert namespace is not None + + initial_level = self.cc.get_logger(worker, namespace)['level'].upper() + # Make sure we pick a different one than what's already set for that namespace + new_level = 'INFO' if initial_level != 'INFO' else 'WARN' + self.cc.set_logger(worker, namespace, 'ERROR') + request_time = int(time.time() * 1000) + affected_loggers = self.cc.set_logger(worker, namespace, new_level) + assert len(affected_loggers) >= 1 + for logger in affected_loggers: + assert logger.startswith(namespace) + + assert self.loggers_set(new_level, request_time, namespace, workers=[worker]) + assert self.loggers_set(initial_level, None, namespace, workers=self.cc.nodes[1:]) + + # Force all loggers to get updated by setting the root namespace to + # two different levels + # This guarantees that their last-modified times will be updated + resp = self.cc.set_logger(worker, 'root', 'DEBUG', 'cluster') + assert resp is None + + new_root = 'INFO' + request_time = int(time.time() * 1000) + resp = self.cc.set_logger(worker, 'root', new_root, 'cluster') + assert resp is None + wait_until( + lambda: self.loggers_set(new_root, request_time), + # This should be super quick--just a write+read of the config topic, which workers are constantly polling + timeout_sec=10, + err_msg="Log level for root namespace was not adjusted in a reasonable amount of time." + ) + + new_level = 'DEBUG' + request_time = int(time.time() * 1000) + resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') + assert resp is None + wait_until( + lambda: self.loggers_set(new_level, request_time, namespace), + timeout_sec=10, + err_msg='Log level for namespace ' + namespace + ' was not adjusted in a reasonable amount of time.' + ) + + prior_all_loggers = [self.cc.get_all_loggers(node) for node in self.cc.nodes] + resp = self.cc.set_logger(worker, namespace, new_level, 'cluster') + assert resp is None + + prior_namespace = namespace + new_namespace = None + for logger, level in prior_all_loggers[0].items(): + if logger != 'root' and not logger.startswith(namespace): Review Comment: That assertion (`num_loggers >= 3`) does not necessarily have to be true for this condition to be met, since altering a logging namespace will also cause child namespaces to be modified and begin to appear in the content of the `GET /admin/loggers` endpoint. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org