This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new b3be18b36f Use None instead of empty data structures when no ElasticSearch logs (#34793) b3be18b36f is described below commit b3be18b36f6dd96dd57717d6fa0484fc8f02805e Author: Daniel Standish <15932138+dstand...@users.noreply.github.com> AuthorDate: Tue Nov 7 22:40:37 2023 -0800 Use None instead of empty data structures when no ElasticSearch logs (#34793) * Use None instead of empty data structures when no ElasticSearch logs Previously, when there were no logs, we would just use [] or {} and continue to process them through functions such as "group by host" and such. But if no logs are found, it's odd to go down those paths. One example of the confusion is the return type in `es_read` was `list | ElasticSearchResponse`. This makes it look like it would either be a list (presumably of logs?) or an ElasticSearchResponse object (presumably also containing logs?). But in reality, the only time list was returned, was when there _were no logs at all!_ This was just to adhere to the contract that we handle "no logs" the same as "logs". _(And it turns out that ElasticSearchResponse magically also behaves like [...] * use more accurate variable names * more accurate naming * fix when no hits --- airflow/providers/elasticsearch/log/es_response.py | 6 ++- .../providers/elasticsearch/log/es_task_handler.py | 54 ++++++++++++---------- 2 files changed, 34 insertions(+), 26 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_response.py b/airflow/providers/elasticsearch/log/es_response.py index ce11c715aa..6d8a3aeac3 100644 --- a/airflow/providers/elasticsearch/log/es_response.py +++ b/airflow/providers/elasticsearch/log/es_response.py @@ -16,6 +16,8 @@ # under the License. from __future__ import annotations +from typing import Iterator + def _wrap(val): if isinstance(val, dict): @@ -117,7 +119,7 @@ class ElasticSearchResponse(AttributeDict): super().__setattr__("_doc_class", doc_class) super().__init__(response) - def __iter__(self): + def __iter__(self) -> Iterator[Hit]: return iter(self.hits) def __getitem__(self, key): @@ -129,7 +131,7 @@ class ElasticSearchResponse(AttributeDict): return bool(self.hits) @property - def hits(self): + def hits(self) -> list[Hit]: """ This property provides access to the hits (i.e., the results) of the Elasticsearch response. diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 6c97e25f3b..33a323a958 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -25,14 +25,13 @@ import time import warnings from collections import defaultdict from operator import attrgetter -from typing import TYPE_CHECKING, Any, Callable, List, Tuple +from typing import TYPE_CHECKING, Any, Callable, List, Literal, Tuple from urllib.parse import quote, urlparse # Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. import elasticsearch import pendulum from elasticsearch.exceptions import NotFoundError -from typing_extensions import Literal from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning @@ -136,7 +135,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix super().__init__(base_log_folder, filename_template) self.closed = False - self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # type: ignore[attr-defined] + self.client = elasticsearch.Elasticsearch(host, **es_kwargs) # in airflow.cfg, host of elasticsearch has to be http://dockerhostXxxx:9200 if USE_PER_RUN_LOG_ID and log_id_template is not None: warnings.warn( @@ -237,12 +236,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix return "" return value.strftime("%Y_%m_%dT%H_%M_%S_%f") - def _group_logs_by_host(self, logs): + def _group_logs_by_host(self, response: ElasticSearchResponse) -> dict[str, list[Hit]]: grouped_logs = defaultdict(list) - for log in logs: - key = getattr_nested(log, self.host_field, None) or "default_host" - grouped_logs[key].append(log) - + for hit in response: + key = getattr_nested(hit, self.host_field, None) or "default_host" + grouped_logs[key].append(hit) return grouped_logs def _read_grouped_logs(self): @@ -267,9 +265,14 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix offset = metadata["offset"] log_id = self._render_log_id(ti, try_number) - logs = self._es_read(log_id, offset) - logs_by_host = self._group_logs_by_host(logs) - next_offset = offset if not logs else attrgetter(self.offset_field)(logs[-1]) + response = self._es_read(log_id, offset) + if response is not None and response.hits: + logs_by_host = self._group_logs_by_host(response) + next_offset = attrgetter(self.offset_field)(response[-1]) + else: + logs_by_host = None + next_offset = offset + # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly # on the client. Sending as a string prevents this issue. # https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER @@ -278,8 +281,9 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix # end_of_log_mark may contain characters like '\n' which is needed to # have the log uploaded but will not be stored in elasticsearch. metadata["end_of_log"] = False - if any(x[-1].message == self.end_of_log_mark for x in logs_by_host.values()): - metadata["end_of_log"] = True + if logs_by_host: + if any(x[-1].message == self.end_of_log_mark for x in logs_by_host.values()): + metadata["end_of_log"] = True cur_ts = pendulum.now() if "last_log_timestamp" in metadata: @@ -308,27 +312,30 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix # If we hit the end of the log, remove the actual end_of_log message # to prevent it from showing in the UI. - def concat_logs(lines): - log_range = (len(lines) - 1) if lines[-1].message == self.end_of_log_mark else len(lines) - return "\n".join(self._format_msg(lines[i]) for i in range(log_range)) + def concat_logs(hits: list[Hit]): + log_range = (len(hits) - 1) if hits[-1].message == self.end_of_log_mark else len(hits) + return "\n".join(self._format_msg(hits[i]) for i in range(log_range)) - message = [(host, concat_logs(hosted_log)) for host, hosted_log in logs_by_host.items()] + if logs_by_host: + message = [(host, concat_logs(hits)) for host, hits in logs_by_host.items()] + else: + message = [] return message, metadata - def _format_msg(self, log_line): + def _format_msg(self, hit: Hit): """Format ES Record to match settings.LOG_FORMAT when used with json_format.""" # Using formatter._style.format makes it future proof i.e. # if we change the formatter style from '%' to '{' or '$', this will still work if self.json_format: with contextlib.suppress(Exception): return self.formatter._style.format( - logging.makeLogRecord({**LOG_LINE_DEFAULTS, **log_line.to_dict()}) + logging.makeLogRecord({**LOG_LINE_DEFAULTS, **hit.to_dict()}) ) # Just a safe-guard to preserve backwards-compatibility - return log_line.message + return hit.message - def _es_read(self, log_id: str, offset: int | str) -> list | ElasticSearchResponse: + def _es_read(self, log_id: str, offset: int | str) -> ElasticSearchResponse | None: """ Return the logs matching log_id in Elasticsearch and next offset or ''. @@ -352,7 +359,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix self.log.exception("The target index pattern %s does not exist", self.index_patterns) raise e - logs: list[Any] | ElasticSearchResponse = [] if max_log_line != 0: try: query.update({"sort": [self.offset_field]}) @@ -362,11 +368,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix size=self.MAX_LINE_PER_PAGE, from_=self.MAX_LINE_PER_PAGE * self.PAGE, ) - logs = ElasticSearchResponse(self, res) + return ElasticSearchResponse(self, res) except Exception as err: self.log.exception("Could not read log with log_id: %s. Exception: %s", log_id, err) - return logs + return None def emit(self, record): if self.handler: