This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 05fe5a4ccf2 Add write feature to ESTaskHandler (#44973)
05fe5a4ccf2 is described below

commit 05fe5a4ccf23bd2775d64ba552b1e57c67c87cfd
Author: Owen Leung <[email protected]>
AuthorDate: Tue Jan 14 00:15:32 2025 +0800

    Add write feature to ESTaskHandler (#44973)
    
    * Add write feature to ESTaskHandler
---
 airflow/config_templates/airflow_local_settings.py |  4 ++
 .../provider_config_fallback_defaults.cfg          |  2 +
 .../logging/index.rst                              | 19 +++++++
 docs/spelling_wordlist.txt                         |  1 +
 .../providers/elasticsearch/log/es_task_handler.py | 62 ++++++++++++++++++++--
 .../airflow/providers/elasticsearch/provider.yaml  | 14 +++++
 .../elasticsearch/log/test_es_task_handler.py      | 26 +++++++--
 7 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/airflow/config_templates/airflow_local_settings.py 
b/airflow/config_templates/airflow_local_settings.py
index cee5b428f6d..f440261dafc 100644
--- a/airflow/config_templates/airflow_local_settings.py
+++ b/airflow/config_templates/airflow_local_settings.py
@@ -308,8 +308,10 @@ if REMOTE_LOGGING:
         ELASTICSEARCH_END_OF_LOG_MARK: str = 
conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
         ELASTICSEARCH_FRONTEND: str = 
conf.get_mandatory_value("elasticsearch", "frontend")
         ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", 
"WRITE_STDOUT")
+        ELASTICSEARCH_WRITE_TO_ES: bool = conf.getboolean("elasticsearch", 
"WRITE_TO_ES")
         ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", 
"JSON_FORMAT")
         ELASTICSEARCH_JSON_FIELDS: str = 
conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
+        ELASTICSEARCH_TARGET_INDEX: str = 
conf.get_mandatory_value("elasticsearch", "TARGET_INDEX")
         ELASTICSEARCH_HOST_FIELD: str = 
conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
         ELASTICSEARCH_OFFSET_FIELD: str = 
conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
 
@@ -322,6 +324,8 @@ if REMOTE_LOGGING:
                 "host": ELASTICSEARCH_HOST,
                 "frontend": ELASTICSEARCH_FRONTEND,
                 "write_stdout": ELASTICSEARCH_WRITE_STDOUT,
+                "write_to_es": ELASTICSEARCH_WRITE_TO_ES,
+                "target_index": ELASTICSEARCH_TARGET_INDEX,
                 "json_format": ELASTICSEARCH_JSON_FORMAT,
                 "json_fields": ELASTICSEARCH_JSON_FIELDS,
                 "host_field": ELASTICSEARCH_HOST_FIELD,
diff --git a/airflow/config_templates/provider_config_fallback_defaults.cfg 
b/airflow/config_templates/provider_config_fallback_defaults.cfg
index ba92feaef47..b49c633c5af 100644
--- a/airflow/config_templates/provider_config_fallback_defaults.cfg
+++ b/airflow/config_templates/provider_config_fallback_defaults.cfg
@@ -82,6 +82,8 @@ json_fields = asctime, filename, lineno, levelname, message
 host_field = host
 offset_field = offset
 index_patterns = _all
+write_to_es = False
+target_index = airflow-logs
 
 [elasticsearch_configs]
 use_ssl = False
diff --git a/docs/apache-airflow-providers-elasticsearch/logging/index.rst 
b/docs/apache-airflow-providers-elasticsearch/logging/index.rst
index 7b56958cafe..eaa46def53b 100644
--- a/docs/apache-airflow-providers-elasticsearch/logging/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/logging/index.rst
@@ -22,6 +22,8 @@ Writing logs to Elasticsearch
 
 Airflow can be configured to read task logs from Elasticsearch and optionally 
write logs to stdout in standard or json format. These logs can later be 
collected and forwarded to the Elasticsearch cluster using tools like fluentd, 
logstash or others.
 
+Airflow also supports writing log to Elasticsearch directly without requiring 
additional software like filebeat and logstash. To enable this feature, set 
``write_to_es`` and ``json_format`` to ``True`` and ``write_stdout`` to 
``False`` in ``airflow.cfg``. Please be aware that if you set both 
``write_to_es`` and ``delete_local_logs`` in logging section to true, airflow 
will delete the local copy of task logs upon successfully writing task logs to 
ElasticSearch.
+
 You can choose to have all task logs from workers output to the highest parent 
level process, instead of the standard file locations. This allows for some 
additional flexibility in container environments like Kubernetes, where 
container stdout is already being logged to the host nodes. From there a log 
shipping tool can be used to forward them along to Elasticsearch. To use this 
feature, set the ``write_stdout`` option in ``airflow.cfg``.
 You can also choose to have the logs output in a JSON format, using the 
``json_format`` option. Airflow uses the standard Python logging module and 
JSON fields are directly extracted from the LogRecord object. To use this 
feature, set the ``json_fields`` option in ``airflow.cfg``. Add the fields to 
the comma-delimited string that you want collected for the logs. These fields 
are from the LogRecord object in the ``logging`` module. `Documentation on 
different attributes can be found here  [...]
 
@@ -47,6 +49,21 @@ To output task logs to stdout in JSON format, the following 
config could be used
     write_stdout = True
     json_format = True
 
+To output task logs to ElasticSearch, the following config could be used: (set 
``delete_local_logs`` to true if you don't want retain a local copy of task log)
+
+.. code-block:: ini
+
+    [logging]
+    remote_logging = True
+    delete_local_logs = False
+
+    [elasticsearch]
+    host = <host>:<port>
+    write_stdout = False
+    json_format = True
+    write_to_es = True
+    target_index = [name of the index to store logs]
+
 .. _write-logs-elasticsearch-tls:
 
 Writing logs to Elasticsearch over TLS
@@ -55,6 +72,8 @@ Writing logs to Elasticsearch over TLS
 To add custom configurations to ElasticSearch (e.g. turning on ``ssl_verify``, 
adding a custom self-signed
 cert, etc.) use the ``elasticsearch_configs`` setting in your ``airflow.cfg``
 
+Note that these configurations also apply when you enable writing logs to 
ElasticSearch
+
 .. code-block:: ini
 
     [logging]
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index dafe706a1c3..d1a1e62d521 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -633,6 +633,7 @@ fetchmany
 fetchone
 FieldMask
 Filebeat
+filebeat
 filehandle
 fileloc
 filelocs
diff --git 
a/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py 
b/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py
index 241fb14aa23..9881f1ac5dc 100644
--- a/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/providers/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -19,7 +19,11 @@ from __future__ import annotations
 
 import contextlib
 import inspect
+import json
 import logging
+import os
+import pathlib
+import shutil
 import sys
 import time
 from collections import defaultdict
@@ -30,6 +34,7 @@ from urllib.parse import quote, urlparse
 # Using `from elasticsearch import *` would break elasticsearch mocking used 
in unit test.
 import elasticsearch
 import pendulum
+from elasticsearch import helpers
 from elasticsearch.exceptions import NotFoundError
 
 from airflow.configuration import conf
@@ -106,10 +111,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
     """
     ElasticsearchTaskHandler is a python log handler that reads logs from 
Elasticsearch.
 
-    Note that Airflow does not handle the indexing of logs into Elasticsearch. 
Instead,
+    Note that Airflow by default does not handle the indexing of logs into 
Elasticsearch. Instead,
     Airflow flushes logs into local files. Additional software setup is 
required to index
     the logs into Elasticsearch, such as using Filebeat and Logstash.
 
+    Airflow can be configured to support directly writing logging to 
Elasticsearch. To enable this feature,
+    set `json_format` and `write_to_es` to `True`.
+
     To efficiently query and sort Elasticsearch results, this handler assumes 
each
     log message has a field `log_id` consists of ti primary keys:
     `log_id = {dag_id}-{task_id}-{logical_date}-{try_number}`
@@ -136,6 +144,8 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         write_stdout: bool,
         json_format: bool,
         json_fields: str,
+        write_to_es: bool = False,
+        target_index: str = "airflow-logs",
         host_field: str = "host",
         offset_field: str = "offset",
         host: str = "http://localhost:9200";,
@@ -166,6 +176,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         self.index_patterns = index_patterns
         self.index_patterns_callable = index_patterns_callable
         self.context_set = False
+        self.write_to_es = write_to_es
+        self.target_index = target_index
+        self.delete_local_copy = kwargs.get(
+            "delete_local_copy", conf.getboolean("logging", 
"delete_local_logs")
+        )
 
         self.formatter: logging.Formatter
         self.handler: logging.FileHandler | logging.StreamHandler  # type: 
ignore[assignment]
@@ -428,9 +443,11 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
                 extras={
                     "dag_id": str(ti.dag_id),
                     "task_id": str(ti.task_id),
-                    date_key: self._clean_date(ti.logical_date)
-                    if AIRFLOW_V_3_0_PLUS
-                    else self._clean_date(ti.execution_date),
+                    date_key: (
+                        self._clean_date(ti.logical_date)
+                        if AIRFLOW_V_3_0_PLUS
+                        else self._clean_date(ti.execution_date)
+                    ),
                     "try_number": str(ti.try_number),
                     "log_id": self._render_log_id(ti, ti.try_number),
                 },
@@ -480,6 +497,18 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
             self.handler.close()
             sys.stdout = sys.__stdout__
 
+        if self.write_to_es and not self.write_stdout:
+            full_path = self.handler.baseFilename  # type: ignore[union-attr]
+            log_relative_path = 
pathlib.Path(full_path).relative_to(self.local_base).as_posix()
+            local_loc = os.path.join(self.local_base, log_relative_path)
+            if os.path.exists(local_loc):
+                # read log and remove old logs to get just the latest additions
+                log = pathlib.Path(local_loc).read_text()
+                log_lines = self._parse_raw_log(log)
+                success = self._write_to_es(log_lines)
+                if success and self.delete_local_copy:
+                    shutil.rmtree(os.path.dirname(local_loc))
+
         super().close()
 
         self.closed = True
@@ -599,6 +628,31 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         callback: type[Hit] | Callable[..., Any] = getattr(doc_class, 
"from_es", doc_class)
         return callback(hit)
 
+    def _parse_raw_log(self, log: str) -> list[dict[str, Any]]:
+        logs = log.split("\n")
+        parsed_logs = []
+        for line in logs:
+            # Make sure line is not empty
+            if line.strip():
+                parsed_logs.append(json.loads(line))
+
+        return parsed_logs
+
+    def _write_to_es(self, log_lines: list[dict[str, Any]]) -> bool:
+        """
+        Write the log to ElasticSearch; return `True` or fails silently and 
return `False`.
+
+        :param log_lines: the log_lines to write to the ElasticSearch.
+        """
+        # Prepare the bulk request for Elasticsearch
+        bulk_actions = [{"_index": self.target_index, "_source": log} for log 
in log_lines]
+        try:
+            _ = helpers.bulk(self.client, bulk_actions)
+            return True
+        except Exception as e:
+            self.log.exception("Unable to insert logs into Elasticsearch. 
Reason: %s", str(e))
+            return False
+
 
 def getattr_nested(obj, item, default):
     """
diff --git a/providers/src/airflow/providers/elasticsearch/provider.yaml 
b/providers/src/airflow/providers/elasticsearch/provider.yaml
index a1ddbae845c..88ebba2a510 100644
--- a/providers/src/airflow/providers/elasticsearch/provider.yaml
+++ b/providers/src/airflow/providers/elasticsearch/provider.yaml
@@ -136,6 +136,20 @@ config:
         type: string
         example: ~
         default: "False"
+      write_to_es:
+        description: |
+          Write the task logs to the ElasticSearch
+        version_added: 5.5.4
+        type: string
+        example: ~
+        default: "False"
+      target_index:
+        description: |
+          Name of the index to write to, when enabling writing the task logs 
to the ElasticSearch
+        version_added: 5.5.4
+        type: string
+        example: ~
+        default: "airflow-logs"
       json_format:
         description: |
           Instead of the default log formatter, write the log lines as JSON
diff --git a/providers/tests/elasticsearch/log/test_es_task_handler.py 
b/providers/tests/elasticsearch/log/test_es_task_handler.py
index f87d4ffb143..f6b3f793950 100644
--- a/providers/tests/elasticsearch/log/test_es_task_handler.py
+++ b/providers/tests/elasticsearch/log/test_es_task_handler.py
@@ -81,9 +81,11 @@ class TestElasticsearchTaskHandler:
     def ti(self, create_task_instance, create_log_template):
         create_log_template(
             self.FILENAME_TEMPLATE,
-            "{dag_id}-{task_id}-{logical_date}-{try_number}"
-            if AIRFLOW_V_3_0_PLUS
-            else "{dag_id}-{task_id}-{execution_date}-{try_number}",
+            (
+                "{dag_id}-{task_id}-{logical_date}-{try_number}"
+                if AIRFLOW_V_3_0_PLUS
+                else "{dag_id}-{task_id}-{execution_date}-{try_number}"
+            ),
         )
         yield get_ti(
             dag_id=self.DAG_ID,
@@ -673,6 +675,24 @@ class TestElasticsearchTaskHandler:
             filename_template=None,
         )
 
+    def test_write_to_es(self, ti):
+        self.es_task_handler.write_to_es = True
+        self.es_task_handler.json_format = True
+        self.es_task_handler.write_stdout = False
+        self.es_task_handler.local_base = Path(os.getcwd()) / "local" / "log" 
/ "location"
+        formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s 
- %(message)s")
+        self.es_task_handler.formatter = formatter
+
+        self.es_task_handler.set_context(ti)
+        with patch(
+            
"airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler._write_to_es"
+        ) as mock_write_to_es:
+            mock_write = Mock(return_value=True)
+            mock_write_to_es.return_value = mock_write
+            self.es_task_handler._write_to_es = mock_write_to_es
+            self.es_task_handler.close()
+            mock_write_to_es.assert_called_once()
+
 
 def test_safe_attrgetter():
     class A: ...

Reply via email to