This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c47f51655c Add unittest for ElasticSearchJSONFormatter and 
ElasticSearchResponse (#35697)
c47f51655c is described below

commit c47f51655c369e2045283e0a9c1d65c32f231f38
Author: Owen Leung <owen.leu...@gmail.com>
AuthorDate: Fri Nov 17 22:49:56 2023 +0800

    Add unittest for ElasticSearchJSONFormatter and ElasticSearchResponse 
(#35697)
    
    * Add unittest for ElasticSearchJSONFormatter and ElasticSearchResponse
    
    * Remove es from OVERLOOKED_TESTS
---
 tests/always/test_project_structure.py             |   2 -
 .../elasticsearch/log/test_es_json_formatter.py    |  90 +++++++++
 .../elasticsearch/log/test_es_response.py          | 211 +++++++++++++++++++++
 3 files changed, 301 insertions(+), 2 deletions(-)

diff --git a/tests/always/test_project_structure.py 
b/tests/always/test_project_structure.py
index 74b9ada848..a58f33d755 100644
--- a/tests/always/test_project_structure.py
+++ b/tests/always/test_project_structure.py
@@ -109,8 +109,6 @@ class TestProjectStructure:
             "tests/providers/daskexecutor/executors/test_dask_executor.py",
             "tests/providers/databricks/hooks/test_databricks_base.py",
             "tests/providers/docker/test_exceptions.py",
-            "tests/providers/elasticsearch/log/test_es_json_formatter.py",
-            "tests/providers/elasticsearch/log/test_es_response.py",
             "tests/providers/google/cloud/fs/test_gcs.py",
             "tests/providers/google/cloud/links/test_automl.py",
             "tests/providers/google/cloud/links/test_base.py",
diff --git a/tests/providers/elasticsearch/log/test_es_json_formatter.py 
b/tests/providers/elasticsearch/log/test_es_json_formatter.py
new file mode 100644
index 0000000000..4e43c683a1
--- /dev/null
+++ b/tests/providers/elasticsearch/log/test_es_json_formatter.py
@@ -0,0 +1,90 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import json
+import logging
+
+import pendulum
+import pytest
+
+from airflow.providers.elasticsearch.log.es_task_handler import (
+    ElasticsearchJSONFormatter,
+)
+
+
+class TestElasticsearchJSONFormatter:
+    JSON_FIELDS = ["asctime", "filename", "lineno", "levelname", "message", 
"exc_text"]
+    EXTRA_FIELDS = {
+        "dag_id": "dag1",
+        "task_id": "task1",
+        "execution_date": "2023-11-17",
+        "try_number": "1",
+        "log_id": "Some_log_id",
+    }
+
+    @pytest.fixture()
+    def es_json_formatter(self):
+        return ElasticsearchJSONFormatter()
+
+    @pytest.fixture()
+    def log_record(self):
+        return logging.LogRecord(
+            name="test",
+            level=logging.INFO,
+            pathname="test_file.txt",
+            lineno=1,
+            msg="Test message",
+            args=(),
+            exc_info=None,
+        )
+
+    def test_format_log_record(self, es_json_formatter, log_record):
+        """Test the log record formatting."""
+        es_json_formatter.json_fields = self.JSON_FIELDS
+        formatted = es_json_formatter.format(log_record)
+        data = json.loads(formatted)
+        assert all(key in self.JSON_FIELDS for key in data.keys())
+        assert data["filename"] == "test_file.txt"
+        assert data["lineno"] == 1
+        assert data["levelname"] == "INFO"
+        assert data["message"] == "Test message"
+
+    def test_formattime_in_iso8601_format(self, es_json_formatter, log_record):
+        es_json_formatter.json_fields = ["asctime"]
+        iso8601_format = es_json_formatter.formatTime(log_record)
+        try:
+            pendulum.parse(iso8601_format, strict=True)
+        except ValueError:
+            raise Exception("Time is not in ISO8601 format")
+
+    def test_extra_fields(self, es_json_formatter, log_record):
+        es_json_formatter.json_fields = self.JSON_FIELDS
+        es_json_formatter.extras = self.EXTRA_FIELDS
+        formatted = es_json_formatter.format(log_record)
+        data = json.loads(formatted)
+        assert all((key in self.JSON_FIELDS or key in self.EXTRA_FIELDS) for 
key in data.keys())
+        assert data["filename"] == "test_file.txt"
+        assert data["lineno"] == 1
+        assert data["levelname"] == "INFO"
+        assert data["dag_id"] == "dag1"
+        assert data["task_id"] == "task1"
+        assert data["execution_date"] == "2023-11-17"
+        assert data["try_number"] == "1"
+        assert data["log_id"] == "Some_log_id"
diff --git a/tests/providers/elasticsearch/log/test_es_response.py 
b/tests/providers/elasticsearch/log/test_es_response.py
new file mode 100644
index 0000000000..30c41f8d92
--- /dev/null
+++ b/tests/providers/elasticsearch/log/test_es_response.py
@@ -0,0 +1,211 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any
+
+import pytest
+
+from airflow.providers.elasticsearch.log.es_response import (
+    AttributeDict,
+    AttributeList,
+    ElasticSearchResponse,
+    Hit,
+    HitMeta,
+    _wrap,
+)
+from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchTaskHandler
+
+
+class TestWrap:
+    def test_wrap_with_dict(self):
+        test_dict = {"key1": "value1"}
+        result = _wrap(test_dict)
+        assert isinstance(result, AttributeDict)
+        assert result.key1 == "value1"
+
+    def test_wrap_with_non_dict(self):
+        test_values = [1, [2, 3], "string", 4.5]
+        for value in test_values:
+            assert _wrap(value) == value
+
+
+class TestAttributeList:
+    def test_initialization(self):
+        test_list = [1, 2, 3]
+        attr_list = AttributeList(test_list)
+        assert attr_list._l_ == test_list
+
+        test_tuple = (1, 2, 3)
+        attr_list = AttributeList(test_tuple)
+        assert attr_list._l_ == list(test_tuple)
+
+    def test_index_access(self):
+        test_list = [1, {"key1": "value1"}, 3]
+        attr_list = AttributeList(test_list)
+
+        assert attr_list[0] == 1
+        assert isinstance(attr_list[1], AttributeDict)
+        assert attr_list[1].key1 == "value1"
+        assert attr_list[2] == 3
+
+    def test_iteration(self):
+        test_list = [1, {"key": "value"}, 3]
+        attr_list = AttributeList(test_list)
+
+        for i, item in enumerate(attr_list):
+            if isinstance(test_list[i], dict):
+                assert isinstance(item, AttributeDict)
+            else:
+                assert item == test_list[i]
+
+    def test_boolean_representation(self):
+        assert AttributeList([1, 2, 3])
+        assert not (AttributeList([]))
+
+
+class TestAttributeDict:
+    def test_initialization(self):
+        test_dict = {"key1": "value1", "key2": "value2"}
+        attr_dict = AttributeDict(test_dict)
+        assert attr_dict._d_ == test_dict
+
+    def test_attribute_access(self):
+        test_dict = {"key1": "value1", "key2": {"subkey1": "subvalue1"}}
+        attr_dict = AttributeDict(test_dict)
+
+        assert attr_dict.key1 == "value1"
+        assert isinstance(attr_dict.key2, AttributeDict)
+        assert attr_dict.key2.subkey1 == "subvalue1"
+
+    def test_item_access(self):
+        test_dict = {"key1": "value1", "key2": "value2"}
+        attr_dict = AttributeDict(test_dict)
+
+        assert attr_dict["key1"] == "value1"
+        assert attr_dict["key2"] == "value2"
+
+    def test_nonexistent_key(self):
+        test_dict = {"key1": "value1"}
+        attr_dict = AttributeDict(test_dict)
+
+        with pytest.raises(AttributeError):
+            _ = attr_dict.nonexistent_key
+
+    def test_to_dict(self):
+        test_dict = {"key1": "value1", "key2": "value2"}
+        attr_dict = AttributeDict(test_dict)
+        assert attr_dict.to_dict() == test_dict
+
+
+class TestHitAndHitMetaAndElasticSearchResponse:
+    ES_DOCUMENT: dict[str, Any] = {
+        "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7},
+        "hits": {
+            "hits": [
+                {
+                    "_id": "jdeZT4kBjAZqZnexVUxk",
+                    "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
+                    "_score": 2.482621,
+                    "_source": {
+                        "@timestamp": "2023-07-13T14:13:15.140Z",
+                        "asctime": "2023-07-09T07:47:43.907+0000",
+                        "container": {"id": "airflow"},
+                        "dag_id": "example_bash_operator",
+                        "ecs": {"version": "8.0.0"},
+                        "execution_date": "2023_07_09T07_47_32_000000",
+                        "filename": "taskinstance.py",
+                        "input": {"type": "log"},
+                        "levelname": "INFO",
+                        "lineno": 1144,
+                        "log": {
+                            "file": {
+                                "path": 
"/opt/airflow/Documents/GitHub/airflow/logs/"
+                                "dag_id=example_bash_operator'"
+                                
"/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+                            },
+                            "offset": 0,
+                        },
+                        "log.offset": 1688888863907337472,
+                        "log_id": 
"example_bash_operator-run_after_loop-owen_run_run--1-1",
+                        "message": "Dependencies all met for "
+                        "dep_context=non-requeueable deps "
+                        "ti=<TaskInstance: "
+                        "example_bash_operator.run_after_loop "
+                        "owen_run_run [queued]>",
+                        "task_id": "run_after_loop",
+                        "try_number": "1",
+                    },
+                    "_type": "_doc",
+                }
+            ]
+        },
+    }
+    HIT_DOCUMENT = ES_DOCUMENT["hits"]["hits"][0]
+
+    def test_hit_initialization_and_to_dict(self):
+        hit = Hit(self.HIT_DOCUMENT)
+
+        assert hit.asctime == "2023-07-09T07:47:43.907+0000"
+        assert hit.dag_id == "example_bash_operator"
+        assert hit.lineno == 1144
+        assert (
+            hit.log.file.path
+            == 
"/opt/airflow/Documents/GitHub/airflow/logs/dag_id=example_bash_operator'/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log"
+        )
+
+        # Test meta attribute
+        assert isinstance(hit.meta, HitMeta)
+        assert hit.to_dict() == self.HIT_DOCUMENT["_source"]
+
+    def test_hitmeta_initialization_and_to_dict(self):
+        hitmeta = HitMeta(self.HIT_DOCUMENT)
+
+        assert hitmeta.id == "jdeZT4kBjAZqZnexVUxk"
+        assert hitmeta.index == ".ds-filebeat-8.8.2-2023.07.09-000001"
+        assert hitmeta.score == 2.482621
+        assert hitmeta.doc_type == "_doc"
+
+        expected_dict = {
+            k[1:] if k.startswith("_") else k: v for (k, v) in 
self.HIT_DOCUMENT.items() if k != "_source"
+        }
+        expected_dict["doc_type"] = expected_dict.pop("type")
+        assert hitmeta.to_dict() == expected_dict
+
+    def test_elasticsearchresponse_initialization_and_hits_and_bool(self):
+        task_handler = ElasticsearchTaskHandler(
+            base_log_folder="local/log/location",
+            end_of_log_mark="end_of_log\n",
+            write_stdout=False,
+            json_format=False,
+            json_fields="asctime,filename,lineno,levelname,message,exc_text",
+        )
+        response = ElasticSearchResponse(task_handler, self.ES_DOCUMENT)
+
+        assert response._d_ == self.ES_DOCUMENT
+        assert isinstance(response.hits, AttributeList)
+
+        for hit in response.hits:
+            assert isinstance(hit, Hit)
+            assert isinstance(hit.meta, HitMeta)
+
+        assert response.hits[0].asctime == "2023-07-09T07:47:43.907+0000"
+        assert response.hits[0].levelname == "INFO"
+
+        assert bool(response) is True

Reply via email to