This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 95d1a382a13 tests: refactor unit test of elasticsearch (#64200)
95d1a382a13 is described below
commit 95d1a382a13c68caae5129da14556999ad69c3a2
Author: Owen Leung <[email protected]>
AuthorDate: Wed Mar 25 15:43:45 2026 +0800
tests: refactor unit test of elasticsearch (#64200)
---
.../unit/elasticsearch/log/elasticmock/__init__.py | 111 --
.../log/elasticmock/fake_elasticsearch.py | 630 ----------
.../log/elasticmock/utilities/__init__.py | 232 ----
.../unit/elasticsearch/log/test_es_task_handler.py | 1225 ++++++--------------
4 files changed, 367 insertions(+), 1831 deletions(-)
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py
deleted file mode 100644
index 40072c63721..00000000000
---
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py
+++ /dev/null
@@ -1,111 +0,0 @@
-"""Elastic mock module used for testing"""
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-#
-# The MIT License (MIT)
-#
-# Copyright (c) 2016 Marcos Cardoso
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-from functools import wraps
-from unittest.mock import patch
-from urllib.parse import unquote, urlparse
-
-from unit.elasticsearch.log.elasticmock.fake_elasticsearch import
FakeElasticsearch
-
-ELASTIC_INSTANCES: dict[str, FakeElasticsearch] = {}
-
-
-def _normalize_hosts(hosts):
- """
- Helper function to transform hosts argument to
- :class:`~elasticsearch.Elasticsearch` to a list of dicts.
- """
- # if hosts are empty, just defer to defaults down the line
- if hosts is None:
- return [{}]
-
- hosts = [hosts]
-
- out = []
-
- for host_raw in hosts:
- host = f"//{host_raw}" if "://" not in host_raw else host_raw
-
- parsed_url = urlparse(host)
- h = {"host": parsed_url.hostname}
-
- if parsed_url.port:
- h["port"] = parsed_url.port
-
- if parsed_url.scheme == "https":
- h["port"] = parsed_url.port or 443
- h["use_ssl"] = True
-
- if parsed_url.username or parsed_url.password:
- h["http_auth"] =
f"{unquote(parsed_url.username)}:{unquote(parsed_url.password)}"
-
- if parsed_url.path and parsed_url.path != "/":
- h["url_prefix"] = parsed_url.path
-
- out.append(h)
- out.append(host)
- return out
-
-
-def _get_elasticmock(hosts=None, *args, **kwargs):
- host = _normalize_hosts(hosts)[0]
- elastic_key = f"http://{host.get('host', 'localhost')}:{host.get('port',
9200)}"
-
- if elastic_key in ELASTIC_INSTANCES:
- connection = ELASTIC_INSTANCES.get(elastic_key)
- else:
- connection = FakeElasticsearch()
- ELASTIC_INSTANCES[elastic_key] = connection
- return connection
-
-
-def elasticmock(function):
- """Elasticmock decorator"""
-
- @wraps(function)
- def decorated(*args, **kwargs):
- ELASTIC_INSTANCES.clear()
- with patch("elasticsearch.Elasticsearch", _get_elasticmock):
- result = function(*args, **kwargs)
- return result
-
- return decorated
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/fake_elasticsearch.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/fake_elasticsearch.py
deleted file mode 100644
index 4b9d81c0210..00000000000
---
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/fake_elasticsearch.py
+++ /dev/null
@@ -1,630 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import fnmatch
-import json
-
-from elasticsearch import Elasticsearch
-from elasticsearch.exceptions import NotFoundError
-
-from unit.elasticsearch.log.elasticmock.utilities import (
- MissingIndexException,
- get_random_id,
- query_params,
-)
-
-#
-# The MIT License (MIT)
-#
-# Copyright (c) 2016 Marcos Cardoso
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-
-
-class FakeElasticsearch(Elasticsearch):
- __documents_dict = None
-
- def __init__(self):
- super().__init__("http://localhost:9200")
- self.__documents_dict = {}
-
- @query_params()
- def ping(self, params=None):
- return True
-
- @query_params()
- def info(self, params=None):
- return {
- "status": 200,
- "cluster_name": "elasticmock",
- "version": {
- "lucene_version": "4.10.4",
- "build_hash": "00f95f4ffca6de89d68b7ccaf80d148f1f70e4d4",
- "number": "1.7.5",
- "build_timestamp": "2016-02-02T09:55:30Z",
- "build_snapshot": False,
- },
- "name": "Nightwatch",
- "tagline": "You Know, for Search",
- }
-
- @query_params()
- def sample_airflow_2_log_response(self, headers=None, params=None):
- return {
- "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total":
7},
- "hits": {
- "hits": [
- {
- "_id": "jdeZT4kBjAZqZnexVUxk",
- "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
- "_score": 2.482621,
- "_source": {
- "@timestamp": "2023-07-13T14:13:15.140Z",
- "asctime": "2023-07-09T07:47:43.907+0000",
- "container": {"id": "airflow"},
- "dag_id": "example_bash_operator",
- "ecs": {"version": "8.0.0"},
- "execution_date": "2023_07_09T07_47_32_000000",
- "filename": "taskinstance.py",
- "input": {"type": "log"},
- "levelname": "INFO",
- "lineno": 1144,
- "log": {
- "file": {
- "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
- "dag_id=example_bash_operator'"
-
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
- },
- "offset": 0,
- },
- "log.offset": 1688888863907337472,
- "log_id":
"example_bash_operator-run_after_loop-run_run--1-1",
- "message": "Dependencies all met for "
- "dep_context=non-requeueable deps "
- "ti=<TaskInstance: "
- "example_bash_operator.run_after_loop ",
- "task_id": "run_after_loop",
- "try_number": "1",
- },
- "_type": "_doc",
- },
- {
- "_id": "qteZT4kBjAZqZnexVUxl",
- "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
- "_score": 2.482621,
- "_source": {
- "@timestamp": "2023-07-13T14:13:15.141Z",
- "asctime": "2023-07-09T07:47:43.917+0000",
- "container": {"id": "airflow"},
- "dag_id": "example_bash_operator",
- "ecs": {"version": "8.0.0"},
- "execution_date": "2023_07_09T07_47_32_000000",
- "filename": "taskinstance.py",
- "input": {"type": "log"},
- "levelname": "INFO",
- "lineno": 1347,
- "log": {
- "file": {
- "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
- "dag_id=example_bash_operator"
-
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
- },
- "offset": 988,
- },
- "log.offset": 1688888863917961216,
- "log_id":
"example_bash_operator-run_after_loop-run_run--1-1",
- "message": "Starting attempt 1 of 1",
- "task_id": "run_after_loop",
- "try_number": "1",
- },
- "_type": "_doc",
- },
- {
- "_id": "v9eZT4kBjAZqZnexVUx2",
- "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
- "_score": 2.482621,
- "_source": {
- "@timestamp": "2023-07-13T14:13:15.143Z",
- "asctime": "2023-07-09T07:47:43.928+0000",
- "container": {"id": "airflow"},
- "dag_id": "example_bash_operator",
- "ecs": {"version": "8.0.0"},
- "execution_date": "2023_07_09T07_47_32_000000",
- "filename": "taskinstance.py",
- "input": {"type": "log"},
- "levelname": "INFO",
- "lineno": 1368,
- "log": {
- "file": {
- "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
- "dag_id=example_bash_operator"
-
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
- },
- "offset": 1372,
- },
- "log.offset": 1688888863928218880,
- "log_id":
"example_bash_operator-run_after_loop-run_run--1-1",
- "message": "Executing <Task(BashOperator): "
- "run_after_loop> on 2023-07-09 "
- "07:47:32+00:00",
- "task_id": "run_after_loop",
- "try_number": "1",
- },
- "_type": "_doc",
- },
- ],
- "max_score": 2.482621,
- "total": {"relation": "eq", "value": 36},
- },
- "timed_out": False,
- "took": 7,
- }
-
- @query_params()
- def sample_airflow_3_log_response(self, headers=None, params=None):
- return {
- "_shards": {"failed": 0, "skipped": 0, "successful": 7, "total":
7},
- "hits": {
- "hits": [
- {
- "_id": "jdeZT4kBjAZqZnexVUxk",
- "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
- "_score": 2.482621,
- "_source": {
- "@timestamp": "2023-07-13T14:13:15.140Z",
- "asctime": "2023-07-09T07:47:43.907+0000",
- "container": {"id": "airflow"},
- "dag_id": "example_bash_operator",
- "ecs": {"version": "8.0.0"},
- "execution_date": "2023_07_09T07_47_32_000000",
- "filename": "taskinstance.py",
- "input": {"type": "log"},
- "levelname": "INFO",
- "lineno": 1144,
- "log": {
- "file": {
- "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
- "dag_id=example_bash_operator'"
-
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
- },
- "offset": 0,
- },
- "log.offset": 1688888863907337472,
- "log_id":
"example_bash_operator-run_after_loop-run_run--1-1",
- "task_id": "run_after_loop",
- "try_number": "1",
- "event": "Dependencies all met for "
- "dep_context=non-requeueable deps "
- "ti=<TaskInstance: "
- "example_bash_operator.run_after_loop ",
- },
- "_type": "_doc",
- },
- {
- "_id": "qteZT4kBjAZqZnexVUxl",
- "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
- "_score": 2.482621,
- "_source": {
- "@timestamp": "2023-07-13T14:13:15.141Z",
- "asctime": "2023-07-09T07:47:43.917+0000",
- "container": {"id": "airflow"},
- "dag_id": "example_bash_operator",
- "ecs": {"version": "8.0.0"},
- "execution_date": "2023_07_09T07_47_32_000000",
- "filename": "taskinstance.py",
- "input": {"type": "log"},
- "levelname": "INFO",
- "lineno": 1347,
- "log": {
- "file": {
- "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
- "dag_id=example_bash_operator"
-
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
- },
- "offset": 988,
- },
- "log.offset": 1688888863917961216,
- "log_id":
"example_bash_operator-run_after_loop-run_run--1-1",
- "event": "Starting attempt 1 of 1",
- "task_id": "run_after_loop",
- "try_number": "1",
- },
- "_type": "_doc",
- },
- {
- "_id": "v9eZT4kBjAZqZnexVUx2",
- "_index": ".ds-filebeat-8.8.2-2023.07.09-000001",
- "_score": 2.482621,
- "_source": {
- "@timestamp": "2023-07-13T14:13:15.143Z",
- "asctime": "2023-07-09T07:47:43.928+0000",
- "container": {"id": "airflow"},
- "dag_id": "example_bash_operator",
- "ecs": {"version": "8.0.0"},
- "execution_date": "2023_07_09T07_47_32_000000",
- "filename": "taskinstance.py",
- "input": {"type": "log"},
- "levelname": "INFO",
- "lineno": 1368,
- "log": {
- "file": {
- "path":
"/opt/airflow/Documents/GitHub/airflow/logs/"
- "dag_id=example_bash_operator"
-
"/run_id=run_run/task_id=run_after_loop/attempt=1.log"
- },
- "offset": 1372,
- },
- "log.offset": 1688888863928218880,
- "log_id":
"example_bash_operator-run_after_loop-run_run--1-1",
- "task_id": "run_after_loop",
- "try_number": "1",
- "event": "Executing <Task(BashOperator): "
- "run_after_loop> on 2023-07-09 "
- "07:47:32+00:00",
- },
- "_type": "_doc",
- },
- ],
- "max_score": 2.482621,
- "total": {"relation": "eq", "value": 36},
- },
- "timed_out": False,
- "took": 7,
- }
-
- @query_params(
- "consistency",
- "op_type",
- "parent",
- "refresh",
- "replication",
- "routing",
- "timeout",
- "timestamp",
- "ttl",
- "version",
- "version_type",
- )
- def index(self, index, document=None, doc_type=None, body=None, id=None,
params=None, headers=None):
- if document is None:
- document = body
- if index not in self.__documents_dict:
- self.__documents_dict[index] = []
-
- if id is None:
- id = get_random_id()
-
- version = 1
-
- self.__documents_dict[index].append(
- {
- "_type": doc_type,
- "_id": id,
- "_source": document,
- "_index": index,
- "_version": version,
- "_headers": headers,
- }
- )
-
- return {
- "_type": doc_type,
- "_id": id,
- "created": True,
- "_version": version,
- "_index": index,
- "_headers": headers,
- }
-
- @query_params("parent", "preference", "realtime", "refresh", "routing")
- def exists(self, index, doc_type, id, params=None):
- result = False
- if index in self.__documents_dict:
- for document in self.__documents_dict[index]:
- if document.get("_id") == id and document.get("_type") ==
doc_type:
- result = True
- break
- return result
-
- @query_params(
- "_source",
- "_source_exclude",
- "_source_include",
- "fields",
- "parent",
- "preference",
- "realtime",
- "refresh",
- "routing",
- "version",
- "version_type",
- )
- def get(self, index, id, doc_type="_all", params=None):
- result = None
- if index in self.__documents_dict:
- result = self.find_document(doc_type, id, index, result)
-
- if result:
- result["found"] = True
- else:
- error_data = {"_index": index, "_type": doc_type, "_id": id,
"found": False}
- raise NotFoundError(404, json.dumps(error_data))
-
- return result
-
- def find_document(self, doc_type, id, index, result):
- for document in self.__documents_dict[index]:
- if document.get("_id") == id:
- if doc_type == "_all" or document.get("_type") == doc_type:
- result = document
- break
- return result
-
- @query_params(
- "_source",
- "_source_exclude",
- "_source_include",
- "parent",
- "preference",
- "realtime",
- "refresh",
- "routing",
- "version",
- "version_type",
- )
- def get_source(self, index, doc_type, id, params=None):
- document = self.get(index=index, doc_type=doc_type, id=id,
params=params)
- return document.get("_source")
-
- @query_params(
- "_source",
- "_source_exclude",
- "_source_include",
- "allow_no_indices",
- "analyze_wildcard",
- "analyzer",
- "default_operator",
- "df",
- "expand_wildcards",
- "explain",
- "fielddata_fields",
- "fields",
- "from_",
- "ignore_unavailable",
- "lenient",
- "lowercase_expanded_terms",
- "preference",
- "q",
- "request_cache",
- "routing",
- "scroll",
- "search_type",
- "size",
- "sort",
- "stats",
- "suggest_field",
- "suggest_mode",
- "suggest_size",
- "suggest_text",
- "terminate_after",
- "timeout",
- "track_scores",
- "version",
- )
- def count(self, index=None, doc_type=None, query=None, params=None,
headers=None):
- searchable_indexes = self._normalize_index_to_list(index, query=query)
- searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
- i = 0
- for searchable_index in searchable_indexes:
- for document in self.__documents_dict[searchable_index]:
- if not searchable_doc_types or document.get("_type") in
searchable_doc_types:
- i += 1
- result = {"count": i, "_shards": {"successful": 1, "failed": 0,
"total": 1}}
-
- return result
-
- @query_params(
- "_source",
- "_source_exclude",
- "_source_include",
- "allow_no_indices",
- "analyze_wildcard",
- "analyzer",
- "default_operator",
- "df",
- "expand_wildcards",
- "explain",
- "fielddata_fields",
- "fields",
- "from_",
- "ignore_unavailable",
- "lenient",
- "lowercase_expanded_terms",
- "preference",
- "q",
- "request_cache",
- "routing",
- "scroll",
- "search_type",
- "size",
- "sort",
- "stats",
- "suggest_field",
- "suggest_mode",
- "suggest_size",
- "suggest_text",
- "terminate_after",
- "timeout",
- "track_scores",
- "version",
- )
- def search(self, index=None, doc_type=None, query=None, params=None,
headers=None):
- searchable_indexes = self._normalize_index_to_list(index, query=query)
-
- matches = self._find_match(index, doc_type, query=query)
-
- result = {
- "hits": {"total": len(matches), "max_score": 1.0},
- "_shards": {
- # Simulate indexes with 1 shard each
- "successful": len(searchable_indexes),
- "failed": 0,
- "total": len(searchable_indexes),
- },
- "took": 1,
- "timed_out": False,
- }
-
- hits = []
- for match in matches:
- match["_score"] = 1.0
- hits.append(match)
- result["hits"]["hits"] = hits
-
- return result
-
- @query_params(
- "consistency", "parent", "refresh", "replication", "routing",
"timeout", "version", "version_type"
- )
- def delete(self, index, doc_type, id, params=None, headers=None):
- found = False
-
- if index in self.__documents_dict:
- for document in self.__documents_dict[index]:
- if document.get("_type") == doc_type and document.get("_id")
== id:
- found = True
- self.__documents_dict[index].remove(document)
- break
-
- result_dict = {
- "found": found,
- "_index": index,
- "_type": doc_type,
- "_id": id,
- "_version": 1,
- }
-
- if found:
- return result_dict
- raise NotFoundError(404, json.dumps(result_dict))
-
- @query_params("allow_no_indices", "expand_wildcards",
"ignore_unavailable", "preference", "routing")
- def suggest(self, body, index=None):
- if index is not None and index not in self.__documents_dict:
- raise NotFoundError(404, f"IndexMissingException[[{index}]
missing]")
-
- result_dict = {}
- for key, value in body.items():
- text = value.get("text")
- suggestion = int(text) + 1 if isinstance(text, int) else
f"{text}_suggestion"
- result_dict[key] = [
- {
- "text": text,
- "length": 1,
- "options": [{"text": suggestion, "freq": 1, "score": 1.0}],
- "offset": 0,
- }
- ]
- return result_dict
-
- def _find_match(self, index, doc_type, query):
- searchable_indexes = self._normalize_index_to_list(index, query=query)
- searchable_doc_types = self._normalize_doc_type_to_list(doc_type)
-
- must = query["bool"]["must"][0] # only support one must
-
- matches = []
- for searchable_index in searchable_indexes:
- self.find_document_in_searchable_index(matches, must,
searchable_doc_types, searchable_index)
-
- return matches
-
- def find_document_in_searchable_index(self, matches, must,
searchable_doc_types, searchable_index):
- for document in self.__documents_dict[searchable_index]:
- if not searchable_doc_types or document.get("_type") in
searchable_doc_types:
- if "match_phrase" in must:
- self.match_must_phrase(document, matches, must)
- else:
- matches.append(document)
-
- @staticmethod
- def match_must_phrase(document, matches, must):
- for query_id in must["match_phrase"]:
- query_val = must["match_phrase"][query_id]
- if query_id in document["_source"]:
- if query_val in document["_source"][query_id]:
- # use in as a proxy for match_phrase
- matches.append(document)
-
- # Check index(es) exists.
- def _validate_search_targets(self, targets, query):
- # TODO: support allow_no_indices query parameter
- matches = set()
- for target in targets:
- if target in ("_all", ""):
- matches.update(self.__documents_dict)
- elif "*" in target:
- matches.update(fnmatch.filter(self.__documents_dict, target))
- elif target not in self.__documents_dict:
- raise
MissingIndexException(msg=f"IndexMissingException[[{target}] missing]",
query=query)
- return matches
-
- def _normalize_index_to_list(self, index, query):
- # Ensure to have a list of index
- if index is None:
- searchable_indexes = self.__documents_dict.keys()
- elif isinstance(index, str):
- searchable_indexes = [index]
- elif isinstance(index, list):
- searchable_indexes = index
- else:
- # Is it the correct exception to use ?
- raise ValueError("Invalid param 'index'")
- generator = (target for index in searchable_indexes for target in
index.split(","))
- return list(self._validate_search_targets(generator, query=query))
-
- @staticmethod
- def _normalize_doc_type_to_list(doc_type):
- # Ensure to have a list of index
- if doc_type is None:
- searchable_doc_types = []
- elif isinstance(doc_type, str):
- searchable_doc_types = [doc_type]
- elif isinstance(doc_type, list):
- searchable_doc_types = doc_type
- else:
- # Is it the correct exception to use ?
- raise ValueError("Invalid param 'index'")
-
- return searchable_doc_types
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/utilities/__init__.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/utilities/__init__.py
deleted file mode 100644
index 50b883e0f02..00000000000
---
a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/utilities/__init__.py
+++ /dev/null
@@ -1,232 +0,0 @@
-"""Utilities for Elastic mock"""
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-#
-# The MIT License (MIT)
-#
-# Copyright (c) 2016 Marcos Cardoso
-#
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-#
-# The above copyright notice and this permission notice shall be included in
all
-# copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-# SOFTWARE.
-import base64
-import random
-import string
-from datetime import date, datetime
-from functools import wraps
-
-from elasticsearch.exceptions import NotFoundError
-
-DEFAULT_ELASTICSEARCH_ID_SIZE = 20
-CHARSET_FOR_ELASTICSEARCH_ID = string.ascii_letters + string.digits
-GLOBAL_PARAMS = ("pretty", "human", "error_trace", "format", "filter_path")
-
-
-def get_random_id(size=DEFAULT_ELASTICSEARCH_ID_SIZE):
- """Returns random if for elasticsearch"""
- return "".join(random.choices(CHARSET_FOR_ELASTICSEARCH_ID, k=size))
-
-
-def query_params(*es_query_params, **kwargs):
- """
- Decorator that pops all accepted parameters from method's kwargs and puts
- them in the params argument.
- """
- body_params = kwargs.pop("body_params", None)
- body_only_params = set(body_params or ()) - set(es_query_params)
- body_name = kwargs.pop("body_name", None)
- body_required = kwargs.pop("body_required", False)
- type_possible_in_params = "type" in es_query_params
-
- assert not (body_name and body_params)
-
- assert not (body_name and body_required)
- assert not body_required or body_params
-
- def _wrapper(func):
- @wraps(func)
- def _wrapped(*args, **kwargs):
- params = (kwargs.pop("params", None) or {}).copy()
- headers = {k.lower(): v for k, v in (kwargs.pop("headers", None)
or {}).copy().items()}
-
- if "opaque_id" in kwargs:
- headers["x-opaque-id"] = kwargs.pop("opaque_id")
-
- http_auth = kwargs.pop("http_auth", None)
- api_key = kwargs.pop("api_key", None)
-
- using_body_kwarg = kwargs.get("body", None) is not None
- using_positional_args = args and len(args) > 1
-
- if type_possible_in_params:
- doc_type_in_params = params and "doc_type" in params
- doc_type_in_kwargs = "doc_type" in kwargs
-
- if doc_type_in_params:
- params["type"] = params.pop("doc_type")
- if doc_type_in_kwargs:
- kwargs["type"] = kwargs.pop("doc_type")
-
- if using_body_kwarg or using_positional_args:
- body_only_params_in_use = body_only_params.intersection(kwargs)
- if body_only_params_in_use:
- params_prose = "', '".join(sorted(body_only_params_in_use))
- plural_params = len(body_only_params_in_use) > 1
-
- raise TypeError(
- f"The '{params_prose}' parameter{'s' if plural_params
else ''} "
- f"{'are' if plural_params else 'is'} only serialized
in the "
- f"request body and can't be combined with the 'body'
parameter. "
- f"Either stop using the 'body' parameter and use
keyword-arguments "
- f"only or move the specified parameters into the
'body'. "
- f"See
https://github.com/elastic/elasticsearch-py/issues/1698 "
- f"for more information"
- )
-
- elif set(body_params or ()).intersection(kwargs):
- body = {}
- for param in body_params:
- value = kwargs.pop(param, None)
- if value is not None:
- body[param.rstrip("_")] = value
- kwargs["body"] = body
-
- elif body_required:
- kwargs["body"] = {}
-
- if body_name:
- if body_name in kwargs:
- if using_body_kwarg:
- raise TypeError(
- f"Can't use '{body_name}' and 'body' parameters
together"
- f" because '{body_name}' is an alias for 'body'. "
- f"Instead you should only use the '{body_name}' "
- f"parameter. See
https://github.com/elastic/elasticsearch-py/issues/1698 "
- f"for more information"
- )
- kwargs["body"] = kwargs.pop(body_name)
-
- if http_auth is not None and api_key is not None:
- raise ValueError("Only one of 'http_auth' and 'api_key' may be
passed at a time")
- if http_auth is not None:
- headers["authorization"] = f"Basic
{_base64_auth_header(http_auth)}"
- elif api_key is not None:
- headers["authorization"] = f"ApiKey
{_base64_auth_header(api_key)}"
-
- for p in es_query_params + GLOBAL_PARAMS:
- if p in kwargs:
- v = kwargs.pop(p)
- if v is not None:
- params[p] = _escape(v)
-
- for p in ("ignore", "request_timeout"):
- if p in kwargs:
- params[p] = kwargs.pop(p)
- return func(*args, params=params, headers=headers, **kwargs)
-
- return _wrapped
-
- return _wrapper
-
-
-def to_str(x, encoding="ascii"):
- if not isinstance(x, str):
- return x.decode(encoding)
- return x
-
-
-def to_bytes(x, encoding="ascii"):
- if not isinstance(x, bytes):
- return x.encode(encoding)
- return x
-
-
-def _base64_auth_header(auth_value):
- """Takes either a 2-tuple or a base64-encoded string
- and returns a base64-encoded string to be used
- as an HTTP authorization header.
- """
- if isinstance(auth_value, (list, tuple)):
- auth_value = base64.b64encode(to_bytes(":".join(auth_value)))
- return to_str(auth_value)
-
-
-def _escape(value):
- """
- Escape a single value of a URL string or a query parameter. If it is a list
- or tuple, turn it into a comma-separated string first.
- """
-
- # make sequences into comma-separated strings
- if isinstance(value, (list, tuple)):
- value = ",".join(value)
-
- # dates and datetimes into isoformat
- elif isinstance(value, (date, datetime)):
- value = value.isoformat()
-
- # make bools into true/false strings
- elif isinstance(value, bool):
- value = str(value).lower()
-
- # don't decode bytestrings
- elif isinstance(value, bytes):
- return value
-
- # encode strings to utf-8
- if isinstance(value, str):
- return value.encode("utf-8")
-
- return str(value)
-
-
-class MissingIndexException(NotFoundError):
- """Exception representing a missing index."""
-
- def __init__(self, msg, query):
- self.msg = msg
- self.query = query
-
- def __str__(self):
- return f"IndexMissingException[[{self.msg}] missing] with query
{self.query}"
-
-
-class SearchFailedException(NotFoundError):
- """Exception representing a search failure."""
-
- def __init__(self, msg):
- self.msg = msg
-
- def __str__(self):
- return f"SearchFailedException: {self.msg}"
diff --git
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
index c679eedf38e..6b11c6068a5 100644
---
a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
+++
b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py
@@ -17,13 +17,10 @@
# under the License.
from __future__ import annotations
+import dataclasses
import json
import logging
-import os
import re
-import shutil
-import tempfile
-import uuid
from io import StringIO
from pathlib import Path
from unittest import mock
@@ -35,12 +32,15 @@ import pendulum
import pytest
from airflow.providers.common.compat.sdk import conf
+from airflow.providers.elasticsearch.log.es_json_formatter import
ElasticsearchJSONFormatter
from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse
from airflow.providers.elasticsearch.log.es_task_handler import (
VALID_ES_CONFIG_KEYS,
ElasticsearchRemoteLogIO,
ElasticsearchTaskHandler,
+ _build_log_fields,
_clean_date,
+ _format_error_detail,
_render_log_id,
get_es_kwargs_from_config,
getattr_nested,
@@ -51,12 +51,16 @@ from airflow.utils.timezone import datetime
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_dags, clear_db_runs
-from tests_common.test_utils.paths import AIRFLOW_PROVIDERS_ROOT_PATH
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
-from unit.elasticsearch.log.elasticmock import elasticmock
-from unit.elasticsearch.log.elasticmock.utilities import SearchFailedException
-ES_PROVIDER_YAML_FILE = AIRFLOW_PROVIDERS_ROOT_PATH / "elasticsearch" /
"provider.yaml"
+
[email protected]
+class _MockTI:
+ dag_id: str = "dag_for_testing_es_log_handler"
+ task_id: str = "task_for_testing_es_log_handler"
+ run_id: str = "run_for_testing_es_log_handler"
+ try_number: int = 1
+ map_index: int = -1
def get_ti(dag_id, task_id, run_id, logical_date, create_task_instance):
@@ -73,6 +77,78 @@ def get_ti(dag_id, task_id, run_id, logical_date,
create_task_instance):
return ti
+def _build_es_search_response(*sources: dict, index: str = "test_index",
doc_type: str = "_doc") -> dict:
+ hits = [
+ {
+ "_id": str(i),
+ "_index": index,
+ "_score": 1.0,
+ "_source": source,
+ "_type": doc_type,
+ }
+ for i, source in enumerate(sources, start=1)
+ ]
+ return {
+ "_shards": {"failed": 0, "skipped": 0, "successful": 1, "total": 1},
+ "hits": {
+ "hits": hits,
+ "max_score": 1.0,
+ "total": {"relation": "eq", "value": len(hits)},
+ },
+ "timed_out": False,
+ "took": 1,
+ }
+
+
+def _make_es_response(search, *sources: dict) -> ElasticSearchResponse:
+ return ElasticSearchResponse(search, _build_es_search_response(*sources))
+
+
+def _metadata_from_result(metadatas):
+ return metadatas if AIRFLOW_V_3_0_PLUS else metadatas[0]
+
+
+def _assert_log_events(logs, metadatas, *, expected_events: list[str],
expected_sources: list[str]):
+ metadata = _metadata_from_result(metadatas)
+ if AIRFLOW_V_3_0_PLUS:
+ logs = list(logs)
+ assert logs[0].event == "::group::Log message source details"
+ assert logs[0].sources == expected_sources
+ assert logs[1].event == "::endgroup::"
+ assert [log.event for log in logs[2:]] == expected_events
+ else:
+ assert len(logs) == 1
+ assert len(logs[0]) == 1
+ assert logs[0][0][0] == expected_sources[0]
+ assert logs[0][0][1] == "\n".join(expected_events)
+ return metadata
+
+
+def _assert_no_logs(logs, metadatas):
+ metadata = _metadata_from_result(metadatas)
+ if AIRFLOW_V_3_0_PLUS:
+ assert logs == []
+ else:
+ assert logs == [[]]
+ return metadata
+
+
+def _assert_missing_log_message(logs):
+ expected_pattern = r"^\*\*\* Log .* not found in Elasticsearch.*"
+ if AIRFLOW_V_3_0_PLUS:
+ logs = list(logs)
+ assert len(logs) == 1
+ assert logs[0].event is not None
+ assert logs[0].event.startswith("*** Log ")
+ assert logs[0].event.endswith("may have been removed.")
+ assert logs[0].event
+ assert re.match(expected_pattern, logs[0].event) is not None
+ else:
+ assert len(logs) == 1
+ assert len(logs[0]) == 1
+ assert re.match(expected_pattern, logs[0][0][1]) is not None
+
+
class TestElasticsearchTaskHandler:
DAG_ID = "dag_for_testing_es_task_handler"
TASK_ID = "task_for_testing_es_log_handler"
@@ -81,15 +157,39 @@ class TestElasticsearchTaskHandler:
TRY_NUM = 1
LOGICAL_DATE = datetime(2016, 1, 1)
LOG_ID = f"{DAG_ID}-{TASK_ID}-{RUN_ID}-{MAP_INDEX}-{TRY_NUM}"
- JSON_LOG_ID = f"{DAG_ID}-{TASK_ID}-{_clean_date(LOGICAL_DATE)}-1"
FILENAME_TEMPLATE = "{try_number}.log"
- # TODO: Remove when we stop testing for 2.11 compatibility
@pytest.fixture(autouse=True)
def _use_historical_filename_templates(self):
with conf_vars({("core", "use_historical_filename_templates"):
"True"}):
yield
+ @pytest.fixture(autouse=True)
+ def _setup_handler(self, tmp_path):
+ self.local_log_location = str(tmp_path / "logs")
+ self.end_of_log_mark = "end_of_log\n"
+ self.write_stdout = False
+ self.json_format = False
+ self.json_fields = "asctime,filename,lineno,levelname,message,exc_text"
+ self.host_field = "host"
+ self.offset_field = "offset"
+ self.test_message = "some random stuff"
+ self.base_log_source = {
+ "message": self.test_message,
+ "event": self.test_message,
+ "log_id": self.LOG_ID,
+ "offset": 1,
+ }
+ self.es_task_handler = ElasticsearchTaskHandler(
+ base_log_folder=self.local_log_location,
+ end_of_log_mark=self.end_of_log_mark,
+ write_stdout=self.write_stdout,
+ json_format=self.json_format,
+ json_fields=self.json_fields,
+ host_field=self.host_field,
+ offset_field=self.offset_field,
+ )
+
@pytest.fixture
def ti(self, create_task_instance, create_log_template):
create_log_template(
@@ -110,62 +210,6 @@ class TestElasticsearchTaskHandler:
clear_db_runs()
clear_db_dags()
- @elasticmock
- def setup_method(self, method):
- self.local_log_location = "local/log/location"
- self.end_of_log_mark = "end_of_log\n"
- self.write_stdout = False
- self.json_format = False
- self.json_fields = "asctime,filename,lineno,levelname,message,exc_text"
- self.host_field = "host"
- self.offset_field = "offset"
- self.es_task_handler = ElasticsearchTaskHandler(
- base_log_folder=self.local_log_location,
- end_of_log_mark=self.end_of_log_mark,
- write_stdout=self.write_stdout,
- json_format=self.json_format,
- json_fields=self.json_fields,
- host_field=self.host_field,
- offset_field=self.offset_field,
- )
-
- self.es = elasticsearch.Elasticsearch("http://localhost:9200")
- self.index_name = "test_index"
- self.doc_type = "log"
- self.test_message = "some random stuff"
- self.body = {
- "message": self.test_message,
- "log_id": self.LOG_ID,
- "offset": 1,
- "event": self.test_message,
- }
- self.es.index(index=self.index_name, doc_type=self.doc_type,
document=self.body, id=1)
-
- def teardown_method(self):
- shutil.rmtree(self.local_log_location.split(os.path.sep)[0],
ignore_errors=True)
-
- @pytest.mark.parametrize(
- "sample_response",
- [
- pytest.param(lambda self: self.es.sample_airflow_2_log_response(),
id="airflow_2"),
- pytest.param(lambda self: self.es.sample_airflow_3_log_response(),
id="airflow_3"),
- ],
- )
- def test_es_response(self, sample_response):
- response = sample_response(self)
- es_response = ElasticSearchResponse(self.es_task_handler, response)
- logs_by_host = self.es_task_handler.io._group_logs_by_host(es_response)
-
- for hosted_log in logs_by_host.values():
- message = self.es_task_handler.concat_logs(hosted_log)
-
- assert (
- message == "Dependencies all met for dep_context=non-requeueable"
- " deps ti=<TaskInstance: example_bash_operator.run_after_loop \n"
- "Starting attempt 1 of 1\nExecuting <Task(BashOperator):
run_after_loop> "
- "on 2023-07-09 07:47:32+00:00"
- )
-
@pytest.mark.parametrize(
("host", "expected"),
[
@@ -177,12 +221,9 @@ class TestElasticsearchTaskHandler:
],
)
def test_format_url(self, host, expected):
- """
- Test the format_url method of the ElasticsearchTaskHandler class.
- """
if expected == "ValueError":
with pytest.raises(ValueError, match="'https://' is not a valid
URL."):
- assert ElasticsearchTaskHandler.format_url(host) == expected
+ ElasticsearchTaskHandler.format_url(host)
else:
assert ElasticsearchTaskHandler.format_url(host) == expected
@@ -197,7 +238,6 @@ class TestElasticsearchTaskHandler:
"verify_certs": True,
}
assert es_conf == expected_dict
- # ensure creating with configs does not fail
ElasticsearchTaskHandler(
base_log_folder=self.local_log_location,
end_of_log_mark=self.end_of_log_mark,
@@ -209,387 +249,118 @@ class TestElasticsearchTaskHandler:
es_kwargs=es_conf,
)
- def test_client_with_patterns(self):
- # ensure creating with index patterns does not fail
- patterns = "test_*,other_*"
- handler = ElasticsearchTaskHandler(
- base_log_folder=self.local_log_location,
- end_of_log_mark=self.end_of_log_mark,
- write_stdout=self.write_stdout,
- json_format=self.json_format,
- json_fields=self.json_fields,
- host_field=self.host_field,
- offset_field=self.offset_field,
- index_patterns=patterns,
- )
- assert handler.index_patterns == patterns
-
- @pytest.mark.db_test
- def test_read(self, ti):
- ts = pendulum.now()
- logs, metadatas = self.es_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False}
- )
-
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost:9200"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == "some random stuff"
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert self.test_message == logs[0][0][-1]
-
- metadata = metadatas[0]
-
- assert not metadata["end_of_log"]
- assert metadata["offset"] == "1"
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
- @pytest.mark.db_test
- def test_read_with_patterns(self, ti):
- ts = pendulum.now()
- with mock.patch.object(self.es_task_handler, "index_patterns",
new="test_*,other_*"):
- logs, metadatas = self.es_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts),
"end_of_log": False}
- )
-
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost:9200"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == "some random stuff"
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert self.test_message == logs[0][0][-1]
-
- metadata = metadatas[0]
-
- assert not metadata["end_of_log"]
- assert metadata["offset"] == "1"
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
@pytest.mark.db_test
- def test_read_with_patterns_no_match(self, ti):
- ts = pendulum.now()
- with mock.patch.object(self.es_task_handler.io, "index_patterns",
new="test_other_*,test_another_*"):
- logs, metadatas = self.es_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts),
"end_of_log": False}
- )
-
- if AIRFLOW_V_3_0_PLUS:
- assert logs == []
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert logs == [[]]
-
- metadata = metadatas[0]
-
- assert metadata["offset"] == "0"
- assert metadata["end_of_log"]
- # last_log_timestamp won't change if no log lines read.
- assert timezone.parse(metadata["last_log_timestamp"]) == ts
-
- @pytest.mark.db_test
- def test_read_with_missing_index(self, ti):
- ts = pendulum.now()
- with mock.patch.object(self.es_task_handler.io, "index_patterns",
new="nonexistent,test_*"):
- with pytest.raises(elasticsearch.exceptions.NotFoundError,
match=r"IndexMissingException.*"):
- self.es_task_handler.read(
+ @pytest.mark.parametrize("metadata_mode", ["provided", "none", "empty"])
+ def test_read(self, ti, metadata_mode):
+ start_time = pendulum.now()
+ response = _make_es_response(self.es_task_handler.io,
self.base_log_source)
+
+ with patch.object(self.es_task_handler.io, "_es_read",
return_value=response):
+ if metadata_mode == "provided":
+ logs, metadatas = self.es_task_handler.read(
ti,
1,
- {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False},
+ {"offset": 0, "last_log_timestamp": str(start_time),
"end_of_log": False},
)
-
- @pytest.mark.db_test
- @pytest.mark.parametrize("seconds", [3, 6])
- def test_read_missing_logs(self, seconds, create_task_instance):
- """
- When the log actually isn't there to be found, we only want to wait
for 5 seconds.
- In this case we expect to receive a message of the form 'Log {log_id}
not found in elasticsearch ...'
- """
- run_id = "wrong_run_id"
- ti = get_ti(
- self.DAG_ID,
- self.TASK_ID,
- run_id,
- pendulum.instance(self.LOGICAL_DATE).add(days=1), # so logs are
not found
- create_task_instance=create_task_instance,
- )
- ts = pendulum.now().add(seconds=-seconds)
- logs, metadatas = self.es_task_handler.read(ti, 1, {"offset": 0,
"last_log_timestamp": str(ts)})
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- if seconds > 5:
- # we expect a log not found message when checking began more
than 5 seconds ago
- expected_pattern = r"^\*\*\* Log .* not found in
Elasticsearch.*"
- assert re.match(expected_pattern, logs[0].event) is not None
- assert metadatas["end_of_log"] is True
+ elif metadata_mode == "empty":
+ logs, metadatas = self.es_task_handler.read(ti, 1, {})
else:
- # we've "waited" less than 5 seconds so it should not be "end
of log" and should be no log message
- assert logs == []
- assert metadatas["end_of_log"] is True
- assert metadatas["offset"] == "0"
- assert timezone.parse(metadatas["last_log_timestamp"]) == ts
- else:
- assert len(logs) == 1
- if seconds > 5:
- # we expect a log not found message when checking began more
than 5 seconds ago
- assert len(logs[0]) == 1
- actual_message = logs[0][0][1]
- expected_pattern = r"^\*\*\* Log .* not found in
Elasticsearch.*"
- assert re.match(expected_pattern, actual_message) is not None
- assert metadatas[0]["end_of_log"] is True
- else:
- # we've "waited" less than 5 seconds so it should not be "end
of log" and should be no log message
- assert len(logs[0]) == 0
- assert logs == [[]]
- assert metadatas[0]["end_of_log"] is True
- assert len(logs) == len(metadatas)
- assert metadatas[0]["offset"] == "0"
- assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts
+ logs, metadatas = self.es_task_handler.read(ti, 1)
- @pytest.mark.db_test
- def test_read_with_match_phrase_query(self, ti):
- similar_log_id = (
- f"{TestElasticsearchTaskHandler.TASK_ID}-"
-
f"{TestElasticsearchTaskHandler.DAG_ID}-2016-01-01T00:00:00+00:00-1"
+ metadata = _assert_log_events(
+ logs,
+ metadatas,
+ expected_events=[self.test_message],
+ expected_sources=["http://localhost:9200"],
)
- another_test_message = "another message"
-
- another_body = {
- "message": another_test_message,
- "log_id": similar_log_id,
- "offset": 1,
- }
- self.es.index(index=self.index_name, doc_type=self.doc_type,
document=another_body, id=1)
-
- ts = pendulum.now()
- logs, metadatas = self.es_task_handler.read(
- ti,
- 1,
- {
- "offset": "0",
- "last_log_timestamp": str(ts),
- "end_of_log": False,
- "max_offset": 2,
- },
- )
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost:9200"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == "some random stuff"
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert self.test_message == logs[0][0][-1]
-
- metadata = metadatas[0]
-
- assert not metadata["end_of_log"]
- assert metadata["offset"] == "1"
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
- @pytest.mark.db_test
- def test_read_with_none_metadata(self, ti):
- logs, metadatas = self.es_task_handler.read(ti, 1)
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost:9200"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == "some random stuff"
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert self.test_message == logs[0][0][-1]
-
- metadata = metadatas[0]
assert not metadata["end_of_log"]
assert metadata["offset"] == "1"
- assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now()
+ assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
@pytest.mark.db_test
- def test_read_nonexistent_log(self, ti):
- ts = pendulum.now()
- # In ElasticMock, search is going to return all documents with
matching index
- # and doc_type regardless of match filters, so we delete the log entry
instead
- # of making a new TaskInstance to query.
- self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
- logs, metadatas = self.es_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False}
- )
- if AIRFLOW_V_3_0_PLUS:
- assert logs == []
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert logs == [[]]
-
- metadata = metadatas[0]
+ def test_read_defaults_offset_when_missing_from_metadata(self, ti):
+ start_time = pendulum.now()
+ with patch.object(self.es_task_handler.io, "_es_read",
return_value=None):
+ logs, metadatas = self.es_task_handler.read(ti, 1, {"end_of_log":
False})
- assert metadata["offset"] == "0"
+ metadata = _assert_no_logs(logs, metadatas)
assert metadata["end_of_log"]
- # last_log_timestamp won't change if no log lines read.
- assert timezone.parse(metadata["last_log_timestamp"]) == ts
+ assert metadata["offset"] == "0"
+ assert timezone.parse(metadata["last_log_timestamp"]) >= start_time
@pytest.mark.db_test
- def test_read_with_empty_metadata(self, ti):
- ts = pendulum.now()
- logs, metadatas = self.es_task_handler.read(ti, 1, {})
- print(f"metadatas: {metadatas}")
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost:9200"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == "some random stuff"
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert self.test_message == logs[0][0][-1]
-
- metadata = metadatas[0]
- print(f"metadatas: {metadatas}")
- assert not metadata["end_of_log"]
- # offset should be initialized to 0 if not provided.
- assert metadata["offset"] == "1"
- # last_log_timestamp will be initialized using log reading time
- # if not last_log_timestamp is provided.
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
- # case where offset is missing but metadata not empty.
- self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
- logs, metadatas = self.es_task_handler.read(ti, 1, {"end_of_log":
False})
- if AIRFLOW_V_3_0_PLUS:
- assert logs == []
+ @pytest.mark.parametrize("seconds", [3, 6])
+ def test_read_missing_logs(self, ti, seconds):
+ start_time = pendulum.now().add(seconds=-seconds)
+ with patch.object(self.es_task_handler.io, "_es_read",
return_value=None):
+ logs, metadatas = self.es_task_handler.read(
+ ti,
+ 1,
+ {"offset": 0, "last_log_timestamp": str(start_time),
"end_of_log": False},
+ )
- metadata = metadatas
+ metadata = _metadata_from_result(metadatas)
+ if seconds > 5:
+ _assert_missing_log_message(logs)
else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert logs == [[]]
-
- metadata = metadatas[0]
+ _assert_no_logs(logs, metadatas)
assert metadata["end_of_log"]
- # offset should be initialized to 0 if not provided.
assert metadata["offset"] == "0"
- # last_log_timestamp will be initialized using log reading time
- # if not last_log_timestamp is provided.
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
+ assert timezone.parse(metadata["last_log_timestamp"]) == start_time
@pytest.mark.db_test
def test_read_timeout(self, ti):
- ts = pendulum.now().subtract(minutes=5)
-
- self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
- # in the below call, offset=1 implies that we have already retrieved
something
- # if we had never retrieved any logs at all (offset=0), then we would
have gotten
- # a "logs not found" message after 5 seconds of trying
- offset = 1
- logs, metadatas = self.es_task_handler.read(
- task_instance=ti,
- try_number=1,
- metadata={
- "offset": offset,
- "last_log_timestamp": str(ts),
- "end_of_log": False,
- },
- )
- if AIRFLOW_V_3_0_PLUS:
- assert logs == []
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert logs == [[]]
-
- metadata = metadatas[0]
+ start_time = pendulum.now().subtract(minutes=5)
+ with patch.object(self.es_task_handler.io, "_es_read",
return_value=None):
+ logs, metadatas = self.es_task_handler.read(
+ task_instance=ti,
+ try_number=1,
+ metadata={
+ "offset": 1,
+ "last_log_timestamp": str(start_time),
+ "end_of_log": False,
+ },
+ )
+ metadata = _assert_no_logs(logs, metadatas)
assert metadata["end_of_log"]
- assert str(offset) == metadata["offset"]
- assert timezone.parse(metadata["last_log_timestamp"]) == ts
+ assert metadata["offset"] == "1"
+ assert timezone.parse(metadata["last_log_timestamp"]) == start_time
@pytest.mark.db_test
- def test_read_as_download_logs(self, ti):
- ts = pendulum.now()
- logs, metadatas = self.es_task_handler.read(
- ti,
- 1,
+ def test_read_with_custom_offset_and_host_fields(self, ti):
+ self.es_task_handler.host_field = "host.name"
+ self.es_task_handler.offset_field = "log.offset"
+ self.es_task_handler.io.host_field = "host.name"
+ self.es_task_handler.io.offset_field = "log.offset"
+ response = _make_es_response(
+ self.es_task_handler.io,
{
- "offset": 0,
- "last_log_timestamp": str(ts),
- "download_logs": True,
- "end_of_log": False,
+ "message": self.test_message,
+ "event": self.test_message,
+ "log_id": self.LOG_ID,
+ "log": {"offset": 1},
+ "host": {"name": "somehostname"},
},
)
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[0].event == "::group::Log message source details"
- assert logs[0].sources == ["http://localhost:9200"]
- assert logs[1].event == "::endgroup::"
- assert logs[2].event == "some random stuff"
-
- metadata = metadatas
- else:
- assert len(logs) == 1
- assert len(logs) == len(metadatas)
- assert len(logs[0]) == 1
- assert self.test_message == logs[0][0][-1]
- metadata = metadatas[0]
+ with patch.object(self.es_task_handler.io, "_es_read",
return_value=response):
+ logs, metadatas = self.es_task_handler.read(
+ ti,
+ 1,
+ {"offset": 0, "last_log_timestamp": str(pendulum.now()),
"end_of_log": False},
+ )
- assert not metadata["end_of_log"]
+ metadata = _assert_log_events(
+ logs,
+ metadatas,
+ expected_events=[self.test_message],
+ expected_sources=["somehostname"],
+ )
assert metadata["offset"] == "1"
- assert timezone.parse(metadata["last_log_timestamp"]) > ts
-
- @pytest.mark.db_test
- def test_read_raises(self, ti):
- with mock.patch.object(self.es_task_handler.io.log, "exception") as
mock_exception:
- with mock.patch.object(self.es_task_handler.io.client, "search")
as mock_execute:
- mock_execute.side_effect = SearchFailedException("Failed to
read")
- log_sources, log_msgs = self.es_task_handler.io.read("", ti)
- assert mock_exception.call_count == 1
- args, kwargs = mock_exception.call_args
- assert "Could not read log with log_id:" in args[0]
-
- if AIRFLOW_V_3_0_PLUS:
- assert log_sources == []
- else:
- assert len(log_sources) == 0
- assert len(log_msgs) == 1
- assert log_sources == []
-
- assert "not found in Elasticsearch" in log_msgs[0]
+ assert not metadata["end_of_log"]
@pytest.mark.db_test
def test_set_context(self, ti):
@@ -598,118 +369,29 @@ class TestElasticsearchTaskHandler:
@pytest.mark.db_test
def test_set_context_w_json_format_and_write_stdout(self, ti):
- formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s
- %(message)s")
- self.es_task_handler.formatter = formatter
- self.es_task_handler.write_stdout = True
- self.es_task_handler.json_format = True
- self.es_task_handler.set_context(ti)
-
- @pytest.mark.db_test
- def test_read_with_json_format(self, ti):
- ts = pendulum.now()
- formatter = logging.Formatter(
- "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s -
%(message)s - %(exc_text)s"
- )
- self.es_task_handler.formatter = formatter
- self.es_task_handler.json_format = True
-
- self.body = {
- "message": self.test_message,
- "event": self.test_message,
- "log_id":
f"{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1",
- "offset": 1,
- "asctime": "2020-12-24 19:25:00,962",
- "filename": "taskinstance.py",
- "lineno": 851,
- "levelname": "INFO",
- }
- self.es_task_handler.set_context(ti)
- self.es.index(index=self.index_name, doc_type=self.doc_type,
document=self.body, id=id)
-
- logs, _ = self.es_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False}
+ self.es_task_handler.formatter = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[2].event == self.test_message
- else:
- assert logs[0][0][1] == self.test_message
-
- @pytest.mark.db_test
- def test_read_with_json_format_with_custom_offset_and_host_fields(self,
ti):
- ts = pendulum.now()
- formatter = logging.Formatter(
- "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s -
%(message)s - %(exc_text)s"
- )
- self.es_task_handler.formatter = formatter
+ self.es_task_handler.write_stdout = True
self.es_task_handler.json_format = True
- self.es_task_handler.host_field = "host.name"
- self.es_task_handler.offset_field = "log.offset"
- self.body = {
- "message": self.test_message,
- "event": self.test_message,
- "log_id": self.LOG_ID,
- "log": {"offset": 1},
- "host": {"name": "somehostname"},
- "asctime": "2020-12-24 19:25:00,962",
- "filename": "taskinstance.py",
- "lineno": 851,
- "levelname": "INFO",
- }
self.es_task_handler.set_context(ti)
- self.es.index(index=self.index_name, doc_type=self.doc_type,
document=self.body, id=id)
-
- logs, _ = self.es_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False}
- )
- if AIRFLOW_V_3_0_PLUS:
- logs = list(logs)
- assert logs[2].event == self.test_message
- else:
- assert logs[0][0][1] == self.test_message
-
- @pytest.mark.db_test
- def test_read_with_custom_offset_and_host_fields(self, ti):
- ts = pendulum.now()
- # Delete the existing log entry as it doesn't have the new offset and
host fields
- self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)
- self.es_task_handler.host_field = "host.name"
- self.es_task_handler.offset_field = "log.offset"
-
- self.body = {
- "message": self.test_message,
- "event": self.test_message,
- "log_id": self.LOG_ID,
- "log": {"offset": 1},
- "host": {"name": "somehostname"},
- }
- self.es.index(index=self.index_name, doc_type=self.doc_type,
document=self.body, id=id)
-
- logs, _ = self.es_task_handler.read(
- ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log":
False}
- )
- if AIRFLOW_V_3_0_PLUS:
- pass
- else:
- assert logs[0][0][1] == "some random stuff"
+ assert isinstance(self.es_task_handler.formatter,
ElasticsearchJSONFormatter)
+ assert isinstance(self.es_task_handler.handler, logging.StreamHandler)
+ assert self.es_task_handler.context_set
@pytest.mark.db_test
def test_close(self, ti):
- formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s
- %(message)s")
- self.es_task_handler.formatter = formatter
+ self.es_task_handler.formatter = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+ )
self.es_task_handler.set_context(ti)
self.es_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- # end_of_log_mark may contain characters like '\n' which is needed
to
- # have the log uploaded but will not be stored in elasticsearch.
- # so apply the strip() to log_file.read()
- log_line = log_file.read().strip()
- assert log_line.endswith(self.end_of_log_mark.strip())
+
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert
log_file.read_text().strip().endswith(self.end_of_log_mark.strip())
assert self.es_task_handler.closed
@pytest.mark.db_test
@@ -717,60 +399,40 @@ class TestElasticsearchTaskHandler:
ti.raw = True
self.es_task_handler.set_context(ti)
self.es_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert self.end_of_log_mark not in log_file.read()
- assert self.es_task_handler.closed
- @pytest.mark.db_test
- def test_close_closed(self, ti):
- self.es_task_handler.closed = True
- self.es_task_handler.set_context(ti)
- self.es_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert len(log_file.read()) == 0
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert self.end_of_log_mark not in log_file.read_text()
+ assert self.es_task_handler.closed
@pytest.mark.db_test
def test_close_with_no_handler(self, ti):
self.es_task_handler.set_context(ti)
self.es_task_handler.handler = None
self.es_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert len(log_file.read()) == 0
+
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert log_file.read_text() == ""
assert self.es_task_handler.closed
@pytest.mark.db_test
- def test_close_with_no_stream(self, ti):
+ @pytest.mark.parametrize("stream_state", ["none", "closed"])
+ def test_close_reopens_stream(self, ti, stream_state):
self.es_task_handler.set_context(ti)
- self.es_task_handler.handler.stream = None
- self.es_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert self.end_of_log_mark in log_file.read()
- assert self.es_task_handler.closed
+ if stream_state == "none":
+ self.es_task_handler.handler.stream = None
+ else:
+ self.es_task_handler.handler.stream.close()
- self.es_task_handler.set_context(ti)
- self.es_task_handler.handler.stream.close()
self.es_task_handler.close()
- with open(
- os.path.join(self.local_log_location,
self.FILENAME_TEMPLATE.format(try_number=1))
- ) as log_file:
- assert self.end_of_log_mark in log_file.read()
+
+ log_file = Path(self.local_log_location) /
self.FILENAME_TEMPLATE.format(try_number=1)
+ assert self.end_of_log_mark in log_file.read_text()
assert self.es_task_handler.closed
@pytest.mark.db_test
def test_render_log_id(self, ti):
assert _render_log_id(self.es_task_handler.log_id_template, ti, 1) ==
self.LOG_ID
- self.es_task_handler.json_format = True
- assert _render_log_id(self.es_task_handler.log_id_template, ti, 1) ==
self.LOG_ID
-
def test_clean_date(self):
clean_logical_date = _clean_date(datetime(2016, 7, 8, 9, 10, 11, 12))
assert clean_logical_date == "2016_07_08T09_10_11_000012"
@@ -779,35 +441,12 @@ class TestElasticsearchTaskHandler:
@pytest.mark.parametrize(
("json_format", "es_frontend", "expected_url"),
[
- # Common cases
- (
- True,
- "localhost:5601/{log_id}",
- "https://localhost:5601/" + quote(LOG_ID),
- ),
- (
- False,
- "localhost:5601/{log_id}",
- "https://localhost:5601/" + quote(LOG_ID),
- ),
- # Ignore template if "{log_id}"" is missing in the URL
+ (True, "localhost:5601/{log_id}",
"https://localhost:5601/{log_id}"),
+ (False, "localhost:5601/{log_id}",
"https://localhost:5601/{log_id}"),
(False, "localhost:5601", "https://localhost:5601"),
- # scheme handling
- (
- False,
- "https://localhost:5601/path/{log_id}",
- "https://localhost:5601/path/" + quote(LOG_ID),
- ),
- (
- False,
- "http://localhost:5601/path/{log_id}",
- "http://localhost:5601/path/" + quote(LOG_ID),
- ),
- (
- False,
- "other://localhost:5601/path/{log_id}",
- "other://localhost:5601/path/" + quote(LOG_ID),
- ),
+ (False, "https://localhost:5601/path/{log_id}",
"https://localhost:5601/path/{log_id}"),
+ (False, "http://localhost:5601/path/{log_id}",
"http://localhost:5601/path/{log_id}"),
+ (False, "other://localhost:5601/path/{log_id}",
"other://localhost:5601/path/{log_id}"),
],
)
def test_get_external_log_url(self, ti, json_format, es_frontend,
expected_url):
@@ -821,8 +460,9 @@ class TestElasticsearchTaskHandler:
offset_field=self.offset_field,
frontend=es_frontend,
)
- url = es_task_handler.get_external_log_url(ti, ti.try_number)
- assert expected_url == url
+ assert es_task_handler.get_external_log_url(ti, ti.try_number) ==
expected_url.format(
+ log_id=quote(self.LOG_ID)
+ )
@pytest.mark.parametrize(
("frontend", "expected"),
@@ -838,7 +478,6 @@ class TestElasticsearchTaskHandler:
@pytest.mark.db_test
@mock.patch("sys.__stdout__", new_callable=StringIO)
def test_dynamic_offset(self, stdout_mock, ti, time_machine):
- # arrange
handler = ElasticsearchTaskHandler(
base_log_folder=self.local_log_location,
end_of_log_mark=self.end_of_log_mark,
@@ -850,7 +489,7 @@ class TestElasticsearchTaskHandler:
)
handler.formatter = logging.Formatter()
- logger = logging.getLogger(__name__)
+ logger = logging.getLogger("tests.elasticsearch.dynamic_offset")
logger.handlers = [handler]
logger.propagate = False
@@ -858,17 +497,19 @@ class TestElasticsearchTaskHandler:
handler.set_context(ti)
t1 = pendulum.local(year=2017, month=1, day=1, hour=1, minute=1,
second=15)
- t2, t3 = t1 + pendulum.duration(seconds=5), t1 +
pendulum.duration(seconds=10)
-
- # act
- time_machine.move_to(t1, tick=False)
- ti.log.info("Test")
- time_machine.move_to(t2, tick=False)
- ti.log.info("Test2")
- time_machine.move_to(t3, tick=False)
- ti.log.info("Test3")
+ t2 = t1 + pendulum.duration(seconds=5)
+ t3 = t1 + pendulum.duration(seconds=10)
+
+ try:
+ time_machine.move_to(t1, tick=False)
+ ti.log.info("Test")
+ time_machine.move_to(t2, tick=False)
+ ti.log.info("Test2")
+ time_machine.move_to(t3, tick=False)
+ ti.log.info("Test3")
+ finally:
+ logger.handlers = []
- # assert
first_log, second_log, third_log = map(json.loads,
stdout_mock.getvalue().strip().splitlines())
assert first_log["offset"] < second_log["offset"] < third_log["offset"]
assert first_log["asctime"] == t1.format("YYYY-MM-DDTHH:mm:ss.SSSZZ")
@@ -883,12 +524,11 @@ class TestElasticsearchTaskHandler:
self.es_task_handler.io.index_patterns_callable =
"path.to.index_pattern_callable"
result = self.es_task_handler.io._get_index_patterns({})
-
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
- mock_callable.assert_called_once_with({})
- assert result == "callable_index_pattern"
+
mock_import_string.assert_called_once_with("path.to.index_pattern_callable")
+ mock_callable.assert_called_once_with({})
+ assert result == "callable_index_pattern"
def test_filename_template_for_backward_compatibility(self):
- # filename_template arg support for running the latest provider on
airflow 2
ElasticsearchTaskHandler(
base_log_folder="local/log/location",
end_of_log_mark="end_of_log\n",
@@ -899,129 +539,72 @@ class TestElasticsearchTaskHandler:
)
-def test_safe_attrgetter():
- class A: ...
-
- a = A()
- a.b = "b"
- a.c = None
- a.x = a
- a.x.d = "blah"
- assert getattr_nested(a, "b", None) == "b" # regular getattr
- assert getattr_nested(a, "x.d", None) == "blah" # nested val
- assert getattr_nested(a, "aa", "heya") == "heya" # respects non-none
default
- assert getattr_nested(a, "c", "heya") is None # respects none value
- assert getattr_nested(a, "aa", None) is None # respects none default
-
-
-def test_retrieve_config_keys():
- """
- Tests that the ElasticsearchTaskHandler retrieves the correct
configuration keys from the config file.
- * old_parameters are removed
- * parameters from config are automatically added
- * constructor parameters missing from config are also added
- :return:
- """
- with conf_vars(
- {
- ("elasticsearch_configs", "http_compress"): "False",
- ("elasticsearch_configs", "request_timeout"): "10",
- }
- ):
- args_from_config = get_es_kwargs_from_config().keys()
- # verify_certs comes from default config value
- assert "verify_certs" in args_from_config
- # request_timeout comes from config provided value
- assert "request_timeout" in args_from_config
- # http_compress comes from config value
- assert "http_compress" in args_from_config
- assert "self" not in args_from_config
-
-
-def test_retrieve_retry_on_timeout():
- """
- Test if retrieve timeout is converted to retry_on_timeout.
- """
- with conf_vars(
- {
- ("elasticsearch_configs", "retry_on_timeout"): "True",
- }
- ):
- args_from_config = get_es_kwargs_from_config().keys()
- # verify_certs comes from default config value
- assert "retry_on_timeout" in args_from_config
+class TestTaskHandlerHelpers:
+ def test_safe_attrgetter(self):
+ class A: ...
+ a = A()
+ a.b = "b"
+ a.c = None
+ a.x = a
+ a.x.d = "blah"
+ assert getattr_nested(a, "b", None) == "b"
+ assert getattr_nested(a, "x.d", None) == "blah"
+ assert getattr_nested(a, "aa", "heya") == "heya"
+ assert getattr_nested(a, "c", "heya") is None
+ assert getattr_nested(a, "aa", None) is None
-def test_self_not_valid_arg():
- """
- Test if self is not a valid argument.
- """
- assert "self" not in VALID_ES_CONFIG_KEYS
+ def test_retrieve_config_keys(self):
+ with conf_vars(
+ {
+ ("elasticsearch_configs", "http_compress"): "False",
+ ("elasticsearch_configs", "request_timeout"): "10",
+ }
+ ):
+ args_from_config = get_es_kwargs_from_config().keys()
+ assert "verify_certs" in args_from_config
+ assert "request_timeout" in args_from_config
+ assert "http_compress" in args_from_config
+ assert "self" not in args_from_config
+
+ def test_retrieve_retry_on_timeout(self):
+ with conf_vars(
+ {
+ ("elasticsearch_configs", "retry_on_timeout"): "True",
+ }
+ ):
+ args_from_config = get_es_kwargs_from_config().keys()
+ assert "retry_on_timeout" in args_from_config
+ def test_self_not_valid_arg(self):
+ assert "self" not in VALID_ES_CONFIG_KEYS
[email protected]_test
-class TestElasticsearchRemoteLogIO:
- DAG_ID = "dag_for_testing_es_log_handler"
- TASK_ID = "task_for_testing_es_log_handler"
- RUN_ID = "run_for_testing_es_log_handler"
- LOGICAL_DATE = datetime(2016, 1, 1)
- FILENAME_TEMPLATE = "{try_number}.log"
+class TestElasticsearchRemoteLogIO:
@pytest.fixture(autouse=True)
- def setup_tests(self, ti):
+ def _setup_tests(self, tmp_path):
self.elasticsearch_io = ElasticsearchRemoteLogIO(
write_to_es=True,
write_stdout=True,
delete_local_copy=True,
host="http://localhost:9200",
- base_log_folder=Path(""),
+ base_log_folder=tmp_path,
)
@pytest.fixture
- def tmp_json_file(self):
- with tempfile.TemporaryDirectory() as tmpdir:
- os.makedirs(tmpdir, exist_ok=True)
-
- file_path = os.path.join(tmpdir, "1.log")
- self.tmp_file = file_path
-
- sample_logs = [
- {"message": "start"},
- {"message": "processing"},
- {"message": "end"},
- ]
- with open(file_path, "w") as f:
- for log in sample_logs:
- f.write(json.dumps(log) + "\n")
-
- yield file_path
-
- del self.tmp_file
-
- @pytest.fixture
- def ti(self, create_task_instance, create_log_template):
- create_log_template(
- self.FILENAME_TEMPLATE,
- (
- "{dag_id}-{task_id}-{logical_date}-{try_number}"
- if AIRFLOW_V_3_0_PLUS
- else "{dag_id}-{task_id}-{execution_date}-{try_number}"
- ),
- )
- yield get_ti(
- dag_id=self.DAG_ID,
- task_id=self.TASK_ID,
- run_id=self.RUN_ID,
- logical_date=self.LOGICAL_DATE,
- create_task_instance=create_task_instance,
- )
- clear_db_runs()
- clear_db_dags()
+ def ti(self):
+ return _MockTI()
@pytest.fixture
- def unique_index(self):
- """Generate a unique index name for each test."""
- return f"airflow-logs-{uuid.uuid4()}"
+ def tmp_json_file(self, tmp_path):
+ file_path = tmp_path / "1.log"
+ sample_logs = [
+ {"message": "start"},
+ {"message": "processing"},
+ {"message": "end"},
+ ]
+ file_path.write_text("\n".join(json.dumps(log) for log in sample_logs)
+ "\n")
+ return file_path
def test_write_to_stdout(self, tmp_json_file, ti, capsys):
self.elasticsearch_io.write_to_es = False
@@ -1030,9 +613,7 @@ class TestElasticsearchRemoteLogIO:
captured = capsys.readouterr()
stdout_lines = captured.out.strip().splitlines()
log_entries = [json.loads(line) for line in stdout_lines]
- assert log_entries[0]["message"] == "start"
- assert log_entries[1]["message"] == "processing"
- assert log_entries[2]["message"] == "end"
+ assert [entry["message"] for entry in log_entries] == ["start",
"processing", "end"]
def test_invalid_task_log_file_path(self, ti):
with (
@@ -1041,93 +622,106 @@ class TestElasticsearchRemoteLogIO:
):
self.elasticsearch_io.upload(Path("/invalid/path"), ti)
- mock_parse.assert_not_called()
- mock_write.assert_not_called()
+ mock_parse.assert_not_called()
+ mock_write.assert_not_called()
- def test_raw_log_should_contain_log_id_and_offset(self, tmp_json_file, ti):
- with open(self.tmp_file) as f:
- raw_log = f.read()
- json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, ti)
- assert len(json_log_lines) == 3
- for json_log_line in json_log_lines:
- assert "log_id" in json_log_line
- assert "offset" in json_log_line
-
- @patch("elasticsearch.Elasticsearch.count", return_value={"count": 0})
- def test_read_with_missing_log(self, mocked_count, ti):
- log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+ def test_raw_log_contains_log_id_and_offset(self, tmp_json_file, ti):
+ raw_log = tmp_json_file.read_text()
log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)
- assert log_source_info == []
- assert f"*** Log {log_id} not found in Elasticsearch" in
log_messages[0]
- mocked_count.assert_called_once()
+ json_log_lines = self.elasticsearch_io._parse_raw_log(raw_log, log_id)
- def test_read_error_detail(self, ti):
- """Verify that error_detail is correctly retrieved and formatted."""
- error_detail = [
+ assert len(json_log_lines) == 3
+ assert [line["offset"] for line in json_log_lines] == [1, 2, 3]
+ assert all(line["log_id"] == log_id for line in json_log_lines)
+
+ def test_es_read_builds_expected_query(self, ti):
+ self.elasticsearch_io.client = Mock()
+ self.elasticsearch_io.client.count.return_value = {"count": 1}
+ self.elasticsearch_io.client.search.return_value =
_build_es_search_response(
{
- "is_cause": False,
- "frames": [{"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}],
- "exc_type": "RuntimeError",
- "exc_value": "Woopsie. Something went wrong.",
+ "event": "hello",
+ "log_id":
_render_log_id(self.elasticsearch_io.log_id_template, ti, ti.try_number),
+ "offset": 3,
+ }
+ )
+ self.elasticsearch_io.index_patterns = "airflow-logs-*"
+ log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)
+ query = {
+ "bool": {
+ "filter": [{"range": {self.elasticsearch_io.offset_field:
{"gt": 2}}}],
+ "must": [{"match_phrase": {"log_id": log_id}}],
}
- ]
- body = {
- "event": "Task failed with exception",
- "log_id": _render_log_id(self.elasticsearch_io.log_id_template,
ti, ti.try_number),
- "offset": 1,
- "error_detail": error_detail,
}
- from airflow.providers.elasticsearch.log.es_response import Hit
+ response = self.elasticsearch_io._es_read(log_id, 2, ti)
- mock_hit = Hit({"_source": body})
- with (
- patch.object(self.elasticsearch_io, "_es_read") as mock_es_read,
- patch.object(
- self.elasticsearch_io,
- "_group_logs_by_host",
- return_value={"http://localhost:9200": [mock_hit]},
- ),
- ):
- mock_es_read.return_value = mock.MagicMock()
- mock_es_read.return_value.hits = [mock_hit]
+
self.elasticsearch_io.client.count.assert_called_once_with(index="airflow-logs-*",
query=query)
+ self.elasticsearch_io.client.search.assert_called_once_with(
+ index="airflow-logs-*",
+ query=query,
+ sort=[self.elasticsearch_io.offset_field],
+ size=self.elasticsearch_io.MAX_LINE_PER_PAGE,
+ from_=0,
+ )
+ assert response is not None
+ assert response.hits[0].event == "hello"
- log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+ def test_es_read_returns_none_when_count_is_zero(self, ti):
+ self.elasticsearch_io.client = Mock()
+ self.elasticsearch_io.client.count.return_value = {"count": 0}
- assert len(log_messages) == 1
- log_entry = json.loads(log_messages[0])
- assert "error_detail" in log_entry
- assert log_entry["error_detail"] == error_detail
+ log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)
+ response = self.elasticsearch_io._es_read(log_id, 0, ti)
+ assert response is None
+ self.elasticsearch_io.client.search.assert_not_called()
-# ---------------------------------------------------------------------------
-# Tests for the error_detail helpers (issue #63736)
-# ---------------------------------------------------------------------------
+ def test_es_read_propagates_missing_index(self, ti):
+ self.elasticsearch_io.client = Mock()
+ self.elasticsearch_io.client.count.side_effect =
elasticsearch.exceptions.NotFoundError(
+ 404,
+ "IndexMissingException[[missing] missing]",
+ {},
+ )
+ log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)
+ with pytest.raises(elasticsearch.exceptions.NotFoundError):
+ self.elasticsearch_io._es_read(log_id, 0, ti)
-class TestFormatErrorDetail:
- """Unit tests for _format_error_detail."""
+ def test_es_read_logs_and_returns_none_on_search_error(self, ti):
+ self.elasticsearch_io.client = Mock()
+ self.elasticsearch_io.client.count.return_value = {"count": 1}
+ self.elasticsearch_io.client.search.side_effect = RuntimeError("boom")
- def test_returns_none_for_empty(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
+ log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)
+ with patch.object(self.elasticsearch_io.log, "exception") as
mock_exception:
+ response = self.elasticsearch_io._es_read(log_id, 0, ti)
+
+ assert response is None
+ mock_exception.assert_called_once()
+
+ def test_read_returns_missing_log_message_when_es_read_returns_none(self,
ti):
+ with patch.object(self.elasticsearch_io, "_es_read",
return_value=None):
+ log_source_info, log_messages = self.elasticsearch_io.read("", ti)
+ log_id = _render_log_id(self.elasticsearch_io.log_id_template, ti,
ti.try_number)
+ assert log_source_info == []
+ assert f"*** Log {log_id} not found in Elasticsearch" in
log_messages[0]
+
+
+class TestFormatErrorDetail:
+ def test_returns_none_for_empty(self):
assert _format_error_detail(None) is None
assert _format_error_detail([]) is None
def test_returns_string_for_non_list(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
-
assert _format_error_detail("raw string") == "raw string"
def test_formats_single_exception(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
-
error_detail = [
{
"is_cause": False,
- "frames": [
- {"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"},
- ],
+ "frames": [{"filename": "/app/task.py", "lineno": 13, "name":
"log_and_raise"}],
"exc_type": "RuntimeError",
"exc_value": "Something went wrong.",
"exceptions": [],
@@ -1141,8 +735,6 @@ class TestFormatErrorDetail:
assert "RuntimeError: Something went wrong." in result
def test_formats_chained_exceptions(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
-
error_detail = [
{
"is_cause": True,
@@ -1166,8 +758,6 @@ class TestFormatErrorDetail:
assert "RuntimeError: wrapped" in result
def test_exc_type_without_value(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
-
error_detail = [
{
"is_cause": False,
@@ -1181,19 +771,13 @@ class TestFormatErrorDetail:
assert result.endswith("StopIteration")
def test_non_dict_items_are_stringified(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_format_error_detail
-
result = _format_error_detail(["unexpected string item"])
assert result is not None
assert "unexpected string item" in result
class TestBuildStructuredLogFields:
- """Unit tests for _build_log_fields."""
-
def test_filters_to_allowed_fields(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
-
hit = {"event": "hello", "level": "info", "unknown_field": "should be
dropped"}
result = _build_log_fields(hit)
assert "event" in result
@@ -1201,41 +785,30 @@ class TestBuildStructuredLogFields:
assert "unknown_field" not in result
def test_message_mapped_to_event(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
-
hit = {"message": "plain message", "timestamp": "2024-01-01T00:00:00Z"}
fields = _build_log_fields(hit)
assert fields["event"] == "plain message"
- assert "message" not in fields # Ensure it is popped if used as event
+ assert "message" not in fields
def test_message_preserved_if_event_exists(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
-
hit = {"event": "structured event", "message": "plain message"}
fields = _build_log_fields(hit)
assert fields["event"] == "structured event"
- # message is preserved if it's in TASK_LOG_FIELDS and doesn't collide
with event
assert fields["message"] == "plain message"
def test_levelname_mapped_to_level(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
-
hit = {"event": "msg", "levelname": "ERROR"}
result = _build_log_fields(hit)
assert result["level"] == "ERROR"
assert "levelname" not in result
def test_at_timestamp_mapped_to_timestamp(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
-
hit = {"event": "msg", "@timestamp": "2024-01-01T00:00:00Z"}
result = _build_log_fields(hit)
assert result["timestamp"] == "2024-01-01T00:00:00Z"
assert "@timestamp" not in result
def test_error_detail_is_kept_as_list(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
-
error_detail = [
{
"is_cause": False,
@@ -1244,75 +817,11 @@ class TestBuildStructuredLogFields:
"exc_value": "Woopsie.",
}
]
- hit = {
- "event": "Task failed with exception",
- "error_detail": error_detail,
- }
+ hit = {"event": "Task failed with exception", "error_detail":
error_detail}
result = _build_log_fields(hit)
assert result["error_detail"] == error_detail
def test_error_detail_dropped_when_empty(self):
- from airflow.providers.elasticsearch.log.es_task_handler import
_build_log_fields
-
hit = {"event": "msg", "error_detail": []}
result = _build_log_fields(hit)
assert "error_detail" not in result
-
- @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage
only exists in Airflow 3+")
- @elasticmock
- def test_read_includes_error_detail_in_structured_message(self):
- """End-to-end: a hit with error_detail should surface it in the
returned StructuredLogMessage."""
- from airflow.providers.elasticsearch.log.es_task_handler import
ElasticsearchTaskHandler
-
- local_log_location = "local/log/location"
- handler = ElasticsearchTaskHandler(
- base_log_folder=local_log_location,
- end_of_log_mark="end_of_log\n",
- write_stdout=False,
- json_format=False,
- json_fields="asctime,filename,lineno,levelname,message,exc_text",
- )
-
- es = elasticsearch.Elasticsearch("http://localhost:9200")
- log_id = "test_dag-test_task-test_run--1-1"
- body = {
- "event": "Task failed with exception",
- "log_id": log_id,
- "offset": 1,
- "error_detail": [
- {
- "is_cause": False,
- "frames": [
- {"filename": "/opt/airflow/dags/fail.py", "lineno":
13, "name": "log_and_raise"}
- ],
- "exc_type": "RuntimeError",
- "exc_value": "Woopsie. Something went wrong.",
- }
- ],
- }
- es.index(index="test_index", doc_type="log", document=body, id=1)
-
- # Patch the IO layer to return our fake document
- mock_hit_dict = body.copy()
-
- from airflow.providers.elasticsearch.log.es_response import
ElasticSearchResponse, Hit
-
- mock_hit = Hit({"_source": mock_hit_dict})
- mock_response = mock.MagicMock(spec=ElasticSearchResponse)
- mock_response.hits = [mock_hit]
- mock_response.__iter__ = mock.Mock(return_value=iter([mock_hit]))
- mock_response.__bool__ = mock.Mock(return_value=True)
- mock_response.__getitem__ = mock.Mock(return_value=mock_hit)
-
- with mock.patch.object(handler.io, "_es_read",
return_value=mock_response):
- with mock.patch.object(handler.io, "_group_logs_by_host",
return_value={"localhost": [mock_hit]}):
- # Build StructuredLogMessages
- from airflow.providers.elasticsearch.log.es_task_handler
import _build_log_fields
- from airflow.utils.log.file_task_handler import
StructuredLogMessage
-
- fields = _build_log_fields(mock_hit.to_dict())
- msg = StructuredLogMessage(**fields)
-
- assert msg.event == "Task failed with exception"
- assert hasattr(msg, "error_detail")
- assert msg.error_detail == body["error_detail"]