This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new b28c90354f Deprecate the 2 non-official elasticsearch libraries (#31920) b28c90354f is described below commit b28c90354f110bd598ddce193cf82cb1416adbc8 Author: Owen Leung <owen.leu...@gmail.com> AuthorDate: Sat Jun 24 06:44:51 2023 +0800 Deprecate the 2 non-official elasticsearch libraries (#31920) --------- Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com> --- airflow/providers/elasticsearch/CHANGELOG.rst | 6 ++ .../providers/elasticsearch/hooks/elasticsearch.py | 39 ++++++- .../providers/elasticsearch/log/es_task_handler.py | 116 ++++++++++++++++----- airflow/providers/elasticsearch/provider.yaml | 8 +- docker_tests/test_prod_image.py | 2 +- .../index.rst | 2 - docs/spelling_wordlist.txt | 1 + generated/provider_dependencies.json | 2 - tests/models/test_dag.py | 4 +- .../elasticsearch/log/test_es_task_handler.py | 6 +- 10 files changed, 143 insertions(+), 43 deletions(-) diff --git a/airflow/providers/elasticsearch/CHANGELOG.rst b/airflow/providers/elasticsearch/CHANGELOG.rst index 0b281154c6..e0f2aa0894 100644 --- a/airflow/providers/elasticsearch/CHANGELOG.rst +++ b/airflow/providers/elasticsearch/CHANGELOG.rst @@ -24,6 +24,12 @@ Changelog --------- +5.0.0 +..... + +.. note:: + Deprecate non-official elasticsearch libraries. Only the official elasticsearch library was used + 4.5.1 ..... diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py b/airflow/providers/elasticsearch/hooks/elasticsearch.py index 1a008617bd..bc55762a15 100644 --- a/airflow/providers/elasticsearch/hooks/elasticsearch.py +++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py @@ -20,9 +20,9 @@ from __future__ import annotations import warnings from functools import cached_property from typing import Any +from urllib import parse from elasticsearch import Elasticsearch -from es.elastic.api import Connection as ESConnection, connect from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.hooks.base import BaseHook @@ -30,6 +30,43 @@ from airflow.models.connection import Connection as AirflowConnection from airflow.providers.common.sql.hooks.sql import DbApiHook +def connect( + host: str = "localhost", + port: int = 9200, + user: str | None = None, + password: str | None = None, + scheme: str = "http", + **kwargs: Any, +) -> ESConnection: + return ESConnection(host, port, user, password, scheme, **kwargs) + + +class ESConnection: + """wrapper class for elasticsearch.Elasticsearch.""" + + def __init__( + self, + host: str = "localhost", + port: int = 9200, + user: str | None = None, + password: str | None = None, + scheme: str = "http", + **kwargs: Any, + ): + self.host = host + self.port = port + self.user = user + self.password = password + self.scheme = scheme + self.kwargs = kwargs + netloc = f"{host}:{port}" + self.url = parse.urlunparse((scheme, netloc, "/", None, None, None)) + if user and password: + self.es = Elasticsearch(self.url, http_auth=(user, password), **self.kwargs) + else: + self.es = Elasticsearch(self.url, **self.kwargs) + + class ElasticsearchSQLHook(DbApiHook): """ Interact with Elasticsearch through the elasticsearch-dbapi. diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index cb949566d6..8c8847fe1a 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -30,7 +30,7 @@ from urllib.parse import quote # Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. import elasticsearch import pendulum -from elasticsearch_dsl import Search +from elasticsearch.exceptions import ElasticsearchException, NotFoundError from airflow.configuration import conf from airflow.exceptions import AirflowProviderDeprecationWarning @@ -52,6 +52,34 @@ EsLogMsgType = List[Tuple[str, str]] USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") +class Log: + """wrapper class to mimic the attributes in Search class used in elasticsearch_dsl.Search.""" + + def __init__(self, offset): + self.offset = offset + + +class ElasticSearchResponse: + """wrapper class to mimic the Search class used in elasticsearch_dsl.Search.""" + + def __init__(self, **kwargs): + # Store all provided keyword arguments as attributes of this object + for key, value in kwargs.items(): + if key == "log": + setattr(self, key, Log(**value)) + else: + setattr(self, key, value) + + def to_dict(self): + result = {} + for key in self.__dict__.keys(): + if key == "log": + result[key] = self.__dict__[key].__dict__ + else: + result[key] = self.__dict__[key] + return result + + class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ ElasticsearchTaskHandler is a python log handler that @@ -209,12 +237,9 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix offset = metadata["offset"] log_id = self._render_log_id(ti, try_number) - logs = self.es_read(log_id, offset, metadata) logs_by_host = self._group_logs_by_host(logs) - next_offset = offset if not logs else attrgetter(self.offset_field)(logs[-1]) - # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly # on the client. Sending as a string prevents this issue. # https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER @@ -259,7 +284,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix return "\n".join(self._format_msg(lines[i]) for i in range(log_range)) message = [(host, concat_logs(hosted_log)) for host, hosted_log in logs_by_host.items()] - return message, metadata def _format_msg(self, log_line): @@ -287,31 +311,43 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix :param metadata: log metadata, used for steaming log download. """ # Offset is the unique key for sorting logs given log_id. - search = ( - Search(index=self.index_patterns, using=self.client) - .query("match_phrase", log_id=log_id) - .sort(self.offset_field) - ) + query = { + "query": { + "bool": { + "must": [ + {"match_phrase": {"log_id": log_id}}, + {"range": {self.offset_field: {"gt": int(offset)}}}, + ] + } + }, + "sort": [{self.offset_field: {"order": "asc"}}], + } - search = search.filter("range", **{self.offset_field: {"gt": int(offset)}}) - max_log_line = search.count() - if "download_logs" in metadata and metadata["download_logs"] and "max_offset" not in metadata: - try: - if max_log_line > 0: - metadata["max_offset"] = attrgetter(self.offset_field)( - search[max_log_line - 1].execute()[-1] - ) - else: - metadata["max_offset"] = 0 - except Exception: - self.log.exception("Could not get current log size with log_id: %s", log_id) + try: + max_log_line = self.client.count(index=self.index_patterns, body=query)["count"] + except NotFoundError as e: + self.log.exception("The target index pattern %s does not exist", self.index_patterns) + raise e + except ElasticsearchException as e: + self.log.exception("Could not get current log size with log_id: %s", log_id) + raise e logs = [] if max_log_line != 0: try: - - logs = search[self.MAX_LINE_PER_PAGE * self.PAGE : self.MAX_LINE_PER_PAGE].execute() - except Exception: + res = self.client.search( + index=self.index_patterns, + body=query, + size=self.MAX_LINE_PER_PAGE, + from_=self.MAX_LINE_PER_PAGE * self.PAGE, + ) + logs = [ + ElasticSearchResponse( + **unwrap_response(response), + ) + for response in res["hits"]["hits"] + ] + except elasticsearch.exceptions.ElasticsearchException: self.log.exception("Could not read log with log_id: %s", log_id) return logs @@ -429,3 +465,33 @@ def getattr_nested(obj, item, default): return attrgetter(item)(obj) except AttributeError: return default + + +def unwrap_response(res): + source = res["_source"] + transformed = { + "log_id": source.get("log_id"), + "message": source.get("message"), + "meta": { + "id": res.get("_id"), + "index": res.get("_index"), + "version": res.get("_version"), + "headers": res.get("_headers"), + }, + } + if "offset" in source: + transformed["offset"] = source["offset"] + if "asctime" in source: + transformed["asctime"] = source["asctime"] + if "filename" in source: + transformed["filename"] = source["filename"] + if "host" in source: + transformed["host"] = source["host"] + if "levelname" in source: + transformed["levelname"] = source["levelname"] + if "lineno" in source: + transformed["lineno"] = source["lineno"] + if "log" in source: + transformed["log"] = source["log"] + + return transformed diff --git a/airflow/providers/elasticsearch/provider.yaml b/airflow/providers/elasticsearch/provider.yaml index b7b34a497b..2b6ffaabf2 100644 --- a/airflow/providers/elasticsearch/provider.yaml +++ b/airflow/providers/elasticsearch/provider.yaml @@ -23,6 +23,7 @@ description: | suspended: false versions: + - 5.0.0 - 4.5.1 - 4.5.0 - 4.4.0 @@ -52,14 +53,7 @@ versions: dependencies: - apache-airflow>=2.4.0 - apache-airflow-providers-common-sql>=1.3.1 - # We cannot use elasticsearch 8 yet. The elasticsearch-dsl is not compatible with it. - # elasticsearch>=7.15.0 breaks our tests.The eleasticsearch 7 is an old version that is not - # supported anymore. We should move to elasticsearch 8 as - # likely requires getting rid of elasticsearch-dsl or waiting until there is a compatible version - # We can also try to replace the <7.15.0 with <8.0.0 but we need to solve the test failures first - elasticsearch>7,<7.15.0 - - elasticsearch-dbapi - - elasticsearch-dsl>=5.0.0 integrations: - integration-name: Elasticsearch diff --git a/docker_tests/test_prod_image.py b/docker_tests/test_prod_image.py index e76f04dfd8..46cba5fbfa 100644 --- a/docker_tests/test_prod_image.py +++ b/docker_tests/test_prod_image.py @@ -127,7 +127,7 @@ class TestPythonPackages: "cncf.kubernetes": ["kubernetes", "cryptography"], "dask": ["cloudpickle", "distributed"], "docker": ["docker"], - "elasticsearch": ["elasticsearch", "es.elastic", "elasticsearch_dsl"], + "elasticsearch": ["elasticsearch"], "google": [ "OpenSSL", # "google.ads", Remove google ads as it is vendored in google provider now diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst b/docs/apache-airflow-providers-elasticsearch/index.rst index 24451a7921..9d9f0b24cb 100644 --- a/docs/apache-airflow-providers-elasticsearch/index.rst +++ b/docs/apache-airflow-providers-elasticsearch/index.rst @@ -92,8 +92,6 @@ PIP package Version required ``apache-airflow`` ``>=2.4.0`` ``apache-airflow-providers-common-sql`` ``>=1.3.1`` ``elasticsearch`` ``>7,<7.15.0`` -``elasticsearch-dbapi`` -``elasticsearch-dsl`` ``>=5.0.0`` ======================================= ================== Cross provider package dependencies diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index babb506f41..d2a759e931 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -486,6 +486,7 @@ Drivy dropdown druidHook ds +dsl Dsn dsn dttm diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 8d14eb5806..2a1ae40e3f 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -336,8 +336,6 @@ "deps": [ "apache-airflow-providers-common-sql>=1.3.1", "apache-airflow>=2.4.0", - "elasticsearch-dbapi", - "elasticsearch-dsl>=5.0.0", "elasticsearch>7,<7.15.0" ], "cross-providers-deps": [ diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index fa2bcd7052..3c739c115f 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3642,7 +3642,7 @@ class TestTaskClearingSetupTeardownBehavior: with s2 >> t2: BaseOperator(task_id="w2") BaseOperator(task_id="w3") - # todo: implement tests + # to_do: implement tests def test_get_flat_relative_ids_with_setup_nested_no_ctx_mgr(self): """Let's test some gnarlier cases here""" @@ -3773,7 +3773,7 @@ class TestTaskClearingSetupTeardownBehavior: g2_group_teardown = dag.task_dict["g2.group_teardown"] with pytest.raises(Exception): - # fixme + # fix_me # the line `dag_setup >> tg >> dag_teardown` should be equivalent to # dag_setup >> group_setup; w3 >> dag_teardown # i.e. not group_teardown >> dag_teardown diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index e3ba8f47ae..d402cec8aa 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -29,6 +29,7 @@ from urllib.parse import quote import elasticsearch import pendulum import pytest +from elasticsearch.exceptions import ElasticsearchException from airflow.configuration import conf from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler, getattr_nested @@ -341,13 +342,12 @@ class TestElasticsearchTaskHandler: def test_read_raises(self, ti): with mock.patch.object(self.es_task_handler.log, "exception") as mock_exception: - with mock.patch("elasticsearch_dsl.Search.execute") as mock_execute: - mock_execute.side_effect = Exception("Failed to read") + with mock.patch.object(self.es_task_handler.client, "search") as mock_execute: + mock_execute.side_effect = ElasticsearchException("Failed to read") logs, metadatas = self.es_task_handler.read(ti, 1) assert mock_exception.call_count == 1 args, kwargs = mock_exception.call_args assert "Could not read log with log_id:" in args[0] - assert 1 == len(logs) assert len(logs) == len(metadatas) assert [[]] == logs