This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c47f51655c Add unittest for ElasticSearchJSONFormatter and ElasticSearchResponse (#35697) c47f51655c is described below commit c47f51655c369e2045283e0a9c1d65c32f231f38 Author: Owen Leung <owen.leu...@gmail.com> AuthorDate: Fri Nov 17 22:49:56 2023 +0800 Add unittest for ElasticSearchJSONFormatter and ElasticSearchResponse (#35697) * Add unittest for ElasticSearchJSONFormatter and ElasticSearchResponse * Remove es from OVERLOOKED_TESTS --- tests/always/test_project_structure.py | 2 - .../elasticsearch/log/test_es_json_formatter.py | 90 +++++++++ .../elasticsearch/log/test_es_response.py | 211 +++++++++++++++++++++ 3 files changed, 301 insertions(+), 2 deletions(-) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 74b9ada848..a58f33d755 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -109,8 +109,6 @@ class TestProjectStructure: "tests/providers/daskexecutor/executors/test_dask_executor.py", "tests/providers/databricks/hooks/test_databricks_base.py", "tests/providers/docker/test_exceptions.py", - "tests/providers/elasticsearch/log/test_es_json_formatter.py", - "tests/providers/elasticsearch/log/test_es_response.py", "tests/providers/google/cloud/fs/test_gcs.py", "tests/providers/google/cloud/links/test_automl.py", "tests/providers/google/cloud/links/test_base.py", diff --git a/tests/providers/elasticsearch/log/test_es_json_formatter.py b/tests/providers/elasticsearch/log/test_es_json_formatter.py new file mode 100644 index 0000000000..4e43c683a1 --- /dev/null +++ b/tests/providers/elasticsearch/log/test_es_json_formatter.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import json +import logging + +import pendulum +import pytest + +from airflow.providers.elasticsearch.log.es_task_handler import ( + ElasticsearchJSONFormatter, +) + + +class TestElasticsearchJSONFormatter: + JSON_FIELDS = ["asctime", "filename", "lineno", "levelname", "message", "exc_text"] + EXTRA_FIELDS = { + "dag_id": "dag1", + "task_id": "task1", + "execution_date": "2023-11-17", + "try_number": "1", + "log_id": "Some_log_id", + } + + @pytest.fixture() + def es_json_formatter(self): + return ElasticsearchJSONFormatter() + + @pytest.fixture() + def log_record(self): + return logging.LogRecord( + name="test", + level=logging.INFO, + pathname="test_file.txt", + lineno=1, + msg="Test message", + args=(), + exc_info=None, + ) + + def test_format_log_record(self, es_json_formatter, log_record): + """Test the log record formatting.""" + es_json_formatter.json_fields = self.JSON_FIELDS + formatted = es_json_formatter.format(log_record) + data = json.loads(formatted) + assert all(key in self.JSON_FIELDS for key in data.keys()) + assert data["filename"] == "test_file.txt" + assert data["lineno"] == 1 + assert data["levelname"] == "INFO" + assert data["message"] == "Test message" + + def test_formattime_in_iso8601_format(self, es_json_formatter, log_record): + es_json_formatter.json_fields = ["asctime"] + iso8601_format = es_json_formatter.formatTime(log_record) + try: + pendulum.parse(iso8601_format, strict=True) + except ValueError: + raise Exception("Time is not in ISO8601 format") + + def test_extra_fields(self, es_json_formatter, log_record): + es_json_formatter.json_fields = self.JSON_FIELDS + es_json_formatter.extras = self.EXTRA_FIELDS + formatted = es_json_formatter.format(log_record) + data = json.loads(formatted) + assert all((key in self.JSON_FIELDS or key in self.EXTRA_FIELDS) for key in data.keys()) + assert data["filename"] == "test_file.txt" + assert data["lineno"] == 1 + assert data["levelname"] == "INFO" + assert data["dag_id"] == "dag1" + assert data["task_id"] == "task1" + assert data["execution_date"] == "2023-11-17" + assert data["try_number"] == "1" + assert data["log_id"] == "Some_log_id" diff --git a/tests/providers/elasticsearch/log/test_es_response.py b/tests/providers/elasticsearch/log/test_es_response.py new file mode 100644 index 0000000000..30c41f8d92 --- /dev/null +++ b/tests/providers/elasticsearch/log/test_es_response.py @@ -0,0 +1,211 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import Any + +import pytest + +from airflow.providers.elasticsearch.log.es_response import ( + AttributeDict, + AttributeList, + ElasticSearchResponse, + Hit, + HitMeta, + _wrap, +) +from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler + + +class TestWrap: + def test_wrap_with_dict(self): + test_dict = {"key1": "value1"} + result = _wrap(test_dict) + assert isinstance(result, AttributeDict) + assert result.key1 == "value1" + + def test_wrap_with_non_dict(self): + test_values = [1, [2, 3], "string", 4.5] + for value in test_values: + assert _wrap(value) == value + + +class TestAttributeList: + def test_initialization(self): + test_list = [1, 2, 3] + attr_list = AttributeList(test_list) + assert attr_list._l_ == test_list + + test_tuple = (1, 2, 3) + attr_list = AttributeList(test_tuple) + assert attr_list._l_ == list(test_tuple) + + def test_index_access(self): + test_list = [1, {"key1": "value1"}, 3] + attr_list = AttributeList(test_list) + + assert attr_list[0] == 1 + assert isinstance(attr_list[1], AttributeDict) + assert attr_list[1].key1 == "value1" + assert attr_list[2] == 3 + + def test_iteration(self): + test_list = [1, {"key": "value"}, 3] + attr_list = AttributeList(test_list) + + for i, item in enumerate(attr_list): + if isinstance(test_list[i], dict): + assert isinstance(item, AttributeDict) + else: + assert item == test_list[i] + + def test_boolean_representation(self): + assert AttributeList([1, 2, 3]) + assert not (AttributeList([])) + + +class TestAttributeDict: + def test_initialization(self): + test_dict = {"key1": "value1", "key2": "value2"} + attr_dict = AttributeDict(test_dict) + assert attr_dict._d_ == test_dict + + def test_attribute_access(self): + test_dict = {"key1": "value1", "key2": {"subkey1": "subvalue1"}} + attr_dict = AttributeDict(test_dict) + + assert attr_dict.key1 == "value1" + assert isinstance(attr_dict.key2, AttributeDict) + assert attr_dict.key2.subkey1 == "subvalue1" + + def test_item_access(self): + test_dict = {"key1": "value1", "key2": "value2"} + attr_dict = AttributeDict(test_dict) + + assert attr_dict["key1"] == "value1" + assert attr_dict["key2"] == "value2" + + def test_nonexistent_key(self): + test_dict = {"key1": "value1"} + attr_dict = AttributeDict(test_dict) + + with pytest.raises(AttributeError): + _ = attr_dict.nonexistent_key + + def test_to_dict(self): + test_dict = {"key1": "value1", "key2": "value2"} + attr_dict = AttributeDict(test_dict) + assert attr_dict.to_dict() == test_dict + + +class TestHitAndHitMetaAndElasticSearchResponse: + ES_DOCUMENT: dict[str, Any] = { + "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total": 7}, + "hits": { + "hits": [ + { + "_id": "jdeZT4kBjAZqZnexVUxk", + "_index": ".ds-filebeat-8.8.2-2023.07.09-000001", + "_score": 2.482621, + "_source": { + "@timestamp": "2023-07-13T14:13:15.140Z", + "asctime": "2023-07-09T07:47:43.907+0000", + "container": {"id": "airflow"}, + "dag_id": "example_bash_operator", + "ecs": {"version": "8.0.0"}, + "execution_date": "2023_07_09T07_47_32_000000", + "filename": "taskinstance.py", + "input": {"type": "log"}, + "levelname": "INFO", + "lineno": 1144, + "log": { + "file": { + "path": "/opt/airflow/Documents/GitHub/airflow/logs/" + "dag_id=example_bash_operator'" + "/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + }, + "offset": 0, + }, + "log.offset": 1688888863907337472, + "log_id": "example_bash_operator-run_after_loop-owen_run_run--1-1", + "message": "Dependencies all met for " + "dep_context=non-requeueable deps " + "ti=<TaskInstance: " + "example_bash_operator.run_after_loop " + "owen_run_run [queued]>", + "task_id": "run_after_loop", + "try_number": "1", + }, + "_type": "_doc", + } + ] + }, + } + HIT_DOCUMENT = ES_DOCUMENT["hits"]["hits"][0] + + def test_hit_initialization_and_to_dict(self): + hit = Hit(self.HIT_DOCUMENT) + + assert hit.asctime == "2023-07-09T07:47:43.907+0000" + assert hit.dag_id == "example_bash_operator" + assert hit.lineno == 1144 + assert ( + hit.log.file.path + == "/opt/airflow/Documents/GitHub/airflow/logs/dag_id=example_bash_operator'/run_id=owen_run_run/task_id=run_after_loop/attempt=1.log" + ) + + # Test meta attribute + assert isinstance(hit.meta, HitMeta) + assert hit.to_dict() == self.HIT_DOCUMENT["_source"] + + def test_hitmeta_initialization_and_to_dict(self): + hitmeta = HitMeta(self.HIT_DOCUMENT) + + assert hitmeta.id == "jdeZT4kBjAZqZnexVUxk" + assert hitmeta.index == ".ds-filebeat-8.8.2-2023.07.09-000001" + assert hitmeta.score == 2.482621 + assert hitmeta.doc_type == "_doc" + + expected_dict = { + k[1:] if k.startswith("_") else k: v for (k, v) in self.HIT_DOCUMENT.items() if k != "_source" + } + expected_dict["doc_type"] = expected_dict.pop("type") + assert hitmeta.to_dict() == expected_dict + + def test_elasticsearchresponse_initialization_and_hits_and_bool(self): + task_handler = ElasticsearchTaskHandler( + base_log_folder="local/log/location", + end_of_log_mark="end_of_log\n", + write_stdout=False, + json_format=False, + json_fields="asctime,filename,lineno,levelname,message,exc_text", + ) + response = ElasticSearchResponse(task_handler, self.ES_DOCUMENT) + + assert response._d_ == self.ES_DOCUMENT + assert isinstance(response.hits, AttributeList) + + for hit in response.hits: + assert isinstance(hit, Hit) + assert isinstance(hit.meta, HitMeta) + + assert response.hits[0].asctime == "2023-07-09T07:47:43.907+0000" + assert response.hits[0].levelname == "INFO" + + assert bool(response) is True