This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b28c90354f Deprecate the 2 non-official elasticsearch libraries 
(#31920)
b28c90354f is described below

commit b28c90354f110bd598ddce193cf82cb1416adbc8
Author: Owen Leung <owen.leu...@gmail.com>
AuthorDate: Sat Jun 24 06:44:51 2023 +0800

    Deprecate the 2 non-official elasticsearch libraries (#31920)
    
    
    
    ---------
    
    Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com>
---
 airflow/providers/elasticsearch/CHANGELOG.rst      |   6 ++
 .../providers/elasticsearch/hooks/elasticsearch.py |  39 ++++++-
 .../providers/elasticsearch/log/es_task_handler.py | 116 ++++++++++++++++-----
 airflow/providers/elasticsearch/provider.yaml      |   8 +-
 docker_tests/test_prod_image.py                    |   2 +-
 .../index.rst                                      |   2 -
 docs/spelling_wordlist.txt                         |   1 +
 generated/provider_dependencies.json               |   2 -
 tests/models/test_dag.py                           |   4 +-
 .../elasticsearch/log/test_es_task_handler.py      |   6 +-
 10 files changed, 143 insertions(+), 43 deletions(-)

diff --git a/airflow/providers/elasticsearch/CHANGELOG.rst 
b/airflow/providers/elasticsearch/CHANGELOG.rst
index 0b281154c6..e0f2aa0894 100644
--- a/airflow/providers/elasticsearch/CHANGELOG.rst
+++ b/airflow/providers/elasticsearch/CHANGELOG.rst
@@ -24,6 +24,12 @@
 Changelog
 ---------
 
+5.0.0
+.....
+
+.. note::
+  Deprecate non-official elasticsearch libraries. Only the official 
elasticsearch library was used
+
 4.5.1
 .....
 
diff --git a/airflow/providers/elasticsearch/hooks/elasticsearch.py 
b/airflow/providers/elasticsearch/hooks/elasticsearch.py
index 1a008617bd..bc55762a15 100644
--- a/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++ b/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -20,9 +20,9 @@ from __future__ import annotations
 import warnings
 from functools import cached_property
 from typing import Any
+from urllib import parse
 
 from elasticsearch import Elasticsearch
-from es.elastic.api import Connection as ESConnection, connect
 
 from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.hooks.base import BaseHook
@@ -30,6 +30,43 @@ from airflow.models.connection import Connection as 
AirflowConnection
 from airflow.providers.common.sql.hooks.sql import DbApiHook
 
 
+def connect(
+    host: str = "localhost",
+    port: int = 9200,
+    user: str | None = None,
+    password: str | None = None,
+    scheme: str = "http",
+    **kwargs: Any,
+) -> ESConnection:
+    return ESConnection(host, port, user, password, scheme, **kwargs)
+
+
+class ESConnection:
+    """wrapper class for elasticsearch.Elasticsearch."""
+
+    def __init__(
+        self,
+        host: str = "localhost",
+        port: int = 9200,
+        user: str | None = None,
+        password: str | None = None,
+        scheme: str = "http",
+        **kwargs: Any,
+    ):
+        self.host = host
+        self.port = port
+        self.user = user
+        self.password = password
+        self.scheme = scheme
+        self.kwargs = kwargs
+        netloc = f"{host}:{port}"
+        self.url = parse.urlunparse((scheme, netloc, "/", None, None, None))
+        if user and password:
+            self.es = Elasticsearch(self.url, http_auth=(user, password), 
**self.kwargs)
+        else:
+            self.es = Elasticsearch(self.url, **self.kwargs)
+
+
 class ElasticsearchSQLHook(DbApiHook):
     """
     Interact with Elasticsearch through the elasticsearch-dbapi.
diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py 
b/airflow/providers/elasticsearch/log/es_task_handler.py
index cb949566d6..8c8847fe1a 100644
--- a/airflow/providers/elasticsearch/log/es_task_handler.py
+++ b/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -30,7 +30,7 @@ from urllib.parse import quote
 # Using `from elasticsearch import *` would break elasticsearch mocking used 
in unit test.
 import elasticsearch
 import pendulum
-from elasticsearch_dsl import Search
+from elasticsearch.exceptions import ElasticsearchException, NotFoundError
 
 from airflow.configuration import conf
 from airflow.exceptions import AirflowProviderDeprecationWarning
@@ -52,6 +52,34 @@ EsLogMsgType = List[Tuple[str, str]]
 USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template")
 
 
+class Log:
+    """wrapper class to mimic the attributes in Search class used in 
elasticsearch_dsl.Search."""
+
+    def __init__(self, offset):
+        self.offset = offset
+
+
+class ElasticSearchResponse:
+    """wrapper class to mimic the Search class used in 
elasticsearch_dsl.Search."""
+
+    def __init__(self, **kwargs):
+        # Store all provided keyword arguments as attributes of this object
+        for key, value in kwargs.items():
+            if key == "log":
+                setattr(self, key, Log(**value))
+            else:
+                setattr(self, key, value)
+
+    def to_dict(self):
+        result = {}
+        for key in self.__dict__.keys():
+            if key == "log":
+                result[key] = self.__dict__[key].__dict__
+            else:
+                result[key] = self.__dict__[key]
+        return result
+
+
 class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, 
LoggingMixin):
     """
     ElasticsearchTaskHandler is a python log handler that
@@ -209,12 +237,9 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
 
         offset = metadata["offset"]
         log_id = self._render_log_id(ti, try_number)
-
         logs = self.es_read(log_id, offset, metadata)
         logs_by_host = self._group_logs_by_host(logs)
-
         next_offset = offset if not logs else 
attrgetter(self.offset_field)(logs[-1])
-
         # Ensure a string here. Large offset numbers will get JSON.parsed 
incorrectly
         # on the client. Sending as a string prevents this issue.
         # 
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/MAX_SAFE_INTEGER
@@ -259,7 +284,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
             return "\n".join(self._format_msg(lines[i]) for i in 
range(log_range))
 
         message = [(host, concat_logs(hosted_log)) for host, hosted_log in 
logs_by_host.items()]
-
         return message, metadata
 
     def _format_msg(self, log_line):
@@ -287,31 +311,43 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         :param metadata: log metadata, used for steaming log download.
         """
         # Offset is the unique key for sorting logs given log_id.
-        search = (
-            Search(index=self.index_patterns, using=self.client)
-            .query("match_phrase", log_id=log_id)
-            .sort(self.offset_field)
-        )
+        query = {
+            "query": {
+                "bool": {
+                    "must": [
+                        {"match_phrase": {"log_id": log_id}},
+                        {"range": {self.offset_field: {"gt": int(offset)}}},
+                    ]
+                }
+            },
+            "sort": [{self.offset_field: {"order": "asc"}}],
+        }
 
-        search = search.filter("range", **{self.offset_field: {"gt": 
int(offset)}})
-        max_log_line = search.count()
-        if "download_logs" in metadata and metadata["download_logs"] and 
"max_offset" not in metadata:
-            try:
-                if max_log_line > 0:
-                    metadata["max_offset"] = attrgetter(self.offset_field)(
-                        search[max_log_line - 1].execute()[-1]
-                    )
-                else:
-                    metadata["max_offset"] = 0
-            except Exception:
-                self.log.exception("Could not get current log size with 
log_id: %s", log_id)
+        try:
+            max_log_line = self.client.count(index=self.index_patterns, 
body=query)["count"]
+        except NotFoundError as e:
+            self.log.exception("The target index pattern %s does not exist", 
self.index_patterns)
+            raise e
+        except ElasticsearchException as e:
+            self.log.exception("Could not get current log size with log_id: 
%s", log_id)
+            raise e
 
         logs = []
         if max_log_line != 0:
             try:
-
-                logs = search[self.MAX_LINE_PER_PAGE * self.PAGE : 
self.MAX_LINE_PER_PAGE].execute()
-            except Exception:
+                res = self.client.search(
+                    index=self.index_patterns,
+                    body=query,
+                    size=self.MAX_LINE_PER_PAGE,
+                    from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+                )
+                logs = [
+                    ElasticSearchResponse(
+                        **unwrap_response(response),
+                    )
+                    for response in res["hits"]["hits"]
+                ]
+            except elasticsearch.exceptions.ElasticsearchException:
                 self.log.exception("Could not read log with log_id: %s", 
log_id)
 
         return logs
@@ -429,3 +465,33 @@ def getattr_nested(obj, item, default):
         return attrgetter(item)(obj)
     except AttributeError:
         return default
+
+
+def unwrap_response(res):
+    source = res["_source"]
+    transformed = {
+        "log_id": source.get("log_id"),
+        "message": source.get("message"),
+        "meta": {
+            "id": res.get("_id"),
+            "index": res.get("_index"),
+            "version": res.get("_version"),
+            "headers": res.get("_headers"),
+        },
+    }
+    if "offset" in source:
+        transformed["offset"] = source["offset"]
+    if "asctime" in source:
+        transformed["asctime"] = source["asctime"]
+    if "filename" in source:
+        transformed["filename"] = source["filename"]
+    if "host" in source:
+        transformed["host"] = source["host"]
+    if "levelname" in source:
+        transformed["levelname"] = source["levelname"]
+    if "lineno" in source:
+        transformed["lineno"] = source["lineno"]
+    if "log" in source:
+        transformed["log"] = source["log"]
+
+    return transformed
diff --git a/airflow/providers/elasticsearch/provider.yaml 
b/airflow/providers/elasticsearch/provider.yaml
index b7b34a497b..2b6ffaabf2 100644
--- a/airflow/providers/elasticsearch/provider.yaml
+++ b/airflow/providers/elasticsearch/provider.yaml
@@ -23,6 +23,7 @@ description: |
 
 suspended: false
 versions:
+  - 5.0.0
   - 4.5.1
   - 4.5.0
   - 4.4.0
@@ -52,14 +53,7 @@ versions:
 dependencies:
   - apache-airflow>=2.4.0
   - apache-airflow-providers-common-sql>=1.3.1
-  # We cannot use elasticsearch 8 yet. The elasticsearch-dsl is not compatible 
with it.
-  # elasticsearch>=7.15.0 breaks our tests.The eleasticsearch 7 is an old 
version that is not
-  # supported anymore. We should move to elasticsearch 8 as
-  # likely requires getting rid of elasticsearch-dsl or waiting until there is 
a compatible version
-  # We can also try to replace the <7.15.0 with <8.0.0 but we need to solve 
the test failures first
   - elasticsearch>7,<7.15.0
-  - elasticsearch-dbapi
-  - elasticsearch-dsl>=5.0.0
 
 integrations:
   - integration-name: Elasticsearch
diff --git a/docker_tests/test_prod_image.py b/docker_tests/test_prod_image.py
index e76f04dfd8..46cba5fbfa 100644
--- a/docker_tests/test_prod_image.py
+++ b/docker_tests/test_prod_image.py
@@ -127,7 +127,7 @@ class TestPythonPackages:
         "cncf.kubernetes": ["kubernetes", "cryptography"],
         "dask": ["cloudpickle", "distributed"],
         "docker": ["docker"],
-        "elasticsearch": ["elasticsearch", "es.elastic", "elasticsearch_dsl"],
+        "elasticsearch": ["elasticsearch"],
         "google": [
             "OpenSSL",
             # "google.ads", Remove google ads as it is vendored in google 
provider now
diff --git a/docs/apache-airflow-providers-elasticsearch/index.rst 
b/docs/apache-airflow-providers-elasticsearch/index.rst
index 24451a7921..9d9f0b24cb 100644
--- a/docs/apache-airflow-providers-elasticsearch/index.rst
+++ b/docs/apache-airflow-providers-elasticsearch/index.rst
@@ -92,8 +92,6 @@ PIP package                              Version required
 ``apache-airflow``                       ``>=2.4.0``
 ``apache-airflow-providers-common-sql``  ``>=1.3.1``
 ``elasticsearch``                        ``>7,<7.15.0``
-``elasticsearch-dbapi``
-``elasticsearch-dsl``                    ``>=5.0.0``
 =======================================  ==================
 
 Cross provider package dependencies
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index babb506f41..d2a759e931 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -486,6 +486,7 @@ Drivy
 dropdown
 druidHook
 ds
+dsl
 Dsn
 dsn
 dttm
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 8d14eb5806..2a1ae40e3f 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -336,8 +336,6 @@
     "deps": [
       "apache-airflow-providers-common-sql>=1.3.1",
       "apache-airflow>=2.4.0",
-      "elasticsearch-dbapi",
-      "elasticsearch-dsl>=5.0.0",
       "elasticsearch>7,<7.15.0"
     ],
     "cross-providers-deps": [
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index fa2bcd7052..3c739c115f 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3642,7 +3642,7 @@ class TestTaskClearingSetupTeardownBehavior:
                 with s2 >> t2:
                     BaseOperator(task_id="w2")
                     BaseOperator(task_id="w3")
-        # todo: implement tests
+        # to_do: implement tests
 
     def test_get_flat_relative_ids_with_setup_nested_no_ctx_mgr(self):
         """Let's test some gnarlier cases here"""
@@ -3773,7 +3773,7 @@ class TestTaskClearingSetupTeardownBehavior:
         g2_group_teardown = dag.task_dict["g2.group_teardown"]
 
         with pytest.raises(Exception):
-            # fixme
+            # fix_me
             #   the line `dag_setup >> tg >> dag_teardown` should be 
equivalent to
             #   dag_setup >> group_setup; w3 >> dag_teardown
             #   i.e. not group_teardown >> dag_teardown
diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py 
b/tests/providers/elasticsearch/log/test_es_task_handler.py
index e3ba8f47ae..d402cec8aa 100644
--- a/tests/providers/elasticsearch/log/test_es_task_handler.py
+++ b/tests/providers/elasticsearch/log/test_es_task_handler.py
@@ -29,6 +29,7 @@ from urllib.parse import quote
 import elasticsearch
 import pendulum
 import pytest
+from elasticsearch.exceptions import ElasticsearchException
 
 from airflow.configuration import conf
 from airflow.providers.elasticsearch.log.es_task_handler import 
ElasticsearchTaskHandler, getattr_nested
@@ -341,13 +342,12 @@ class TestElasticsearchTaskHandler:
 
     def test_read_raises(self, ti):
         with mock.patch.object(self.es_task_handler.log, "exception") as 
mock_exception:
-            with mock.patch("elasticsearch_dsl.Search.execute") as 
mock_execute:
-                mock_execute.side_effect = Exception("Failed to read")
+            with mock.patch.object(self.es_task_handler.client, "search") as 
mock_execute:
+                mock_execute.side_effect = ElasticsearchException("Failed to 
read")
                 logs, metadatas = self.es_task_handler.read(ti, 1)
             assert mock_exception.call_count == 1
             args, kwargs = mock_exception.call_args
             assert "Could not read log with log_id:" in args[0]
-
         assert 1 == len(logs)
         assert len(logs) == len(metadatas)
         assert [[]] == logs

Reply via email to