This is an automated email from the ASF dual-hosted git repository. pankajkoti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 747f00f2aa Extend task context logging support for remote logging using Elasticsearch (#32977) 747f00f2aa is described below commit 747f00f2aa159642f3b2dddbb9908c01b8b3b91c Author: Pankaj Koti <pankajkoti...@gmail.com> AuthorDate: Tue Nov 21 11:55:03 2023 +0530 Extend task context logging support for remote logging using Elasticsearch (#32977) * Extend task context logging support for remote logging using Elasticsearch With the addition of task context logging feature in PR #32646, this PR extends the feature to Elasticsearch when is it set as remote logging store. Here, backward compatibility is ensured for older versions of Airflow that do not have the feature included in Airflow Core. * update ensure_ti --------- Co-authored-by: Daniel Standish <15932138+dstand...@users.noreply.github.com> --- .../providers/elasticsearch/log/es_task_handler.py | 46 +++++++++++++++++++--- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 79f9ad0b41..1e8c75b7e3 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -34,7 +34,7 @@ import pendulum from elasticsearch.exceptions import NotFoundError from airflow.configuration import conf -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models.dagrun import DagRun from airflow.providers.elasticsearch.log.es_json_formatter import ElasticsearchJSONFormatter from airflow.providers.elasticsearch.log.es_response import ElasticSearchResponse, Hit @@ -46,7 +46,8 @@ from airflow.utils.session import create_session if TYPE_CHECKING: from datetime import datetime - from airflow.models.taskinstance import TaskInstance + from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} # Elasticsearch hosted log type @@ -84,6 +85,32 @@ def get_es_kwargs_from_config() -> dict[str, Any]: return kwargs_dict +def _ensure_ti(ti: TaskInstanceKey | TaskInstance, session) -> TaskInstance: + """Given TI | TIKey, return a TI object. + + Will raise exception if no TI is found in the database. + """ + from airflow.models.taskinstance import TaskInstance, TaskInstanceKey + + if not isinstance(ti, TaskInstanceKey): + return ti + val = ( + session.query(TaskInstance) + .filter( + TaskInstance.task_id == ti.task_id, + TaskInstance.dag_id == ti.dag_id, + TaskInstance.run_id == ti.run_id, + TaskInstance.map_index == ti.map_index, + ) + .one_or_none() + ) + if isinstance(val, TaskInstance): + val._try_number = ti.try_number + return val + else: + raise AirflowException(f"Could not find TaskInstance for {ti}") + + class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. @@ -182,8 +209,12 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix return host - def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: + def _render_log_id(self, ti: TaskInstance | TaskInstanceKey, try_number: int) -> str: + from airflow.models.taskinstance import TaskInstanceKey + with create_session() as session: + if isinstance(ti, TaskInstanceKey): + ti = _ensure_ti(ti, session) dag_run = ti.get_dagrun(session=session) if USE_PER_RUN_LOG_ID: log_id_template = dag_run.get_log_template(session=session).elasticsearch_id @@ -377,11 +408,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix setattr(record, self.offset_field, int(time.time() * (10**9))) self.handler.emit(record) - def set_context(self, ti: TaskInstance, **kwargs) -> None: + def set_context(self, ti: TaskInstance, *, identifier: str | None = None) -> None: """ Provide task_instance context to airflow task handler. :param ti: task instance object + :param identifier: if set, identifies the Airflow component which is relaying logs from + exceptional scenarios related to the task instance """ is_trigger_log_context = getattr(ti, "is_trigger_log_context", None) is_ti_raw = getattr(ti, "raw", None) @@ -410,7 +443,10 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix self.handler.setLevel(self.level) self.handler.setFormatter(self.formatter) else: - super().set_context(ti) + if getattr(self, "supports_task_context_logging", False): + super().set_context(ti, identifier=identifier) + else: + super().set_context(ti) self.context_set = True def close(self) -> None: