This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fdbb9b01a4f fix: pin compatible-with at the transport layer to keep ES 
8 servers working (#66065)
fdbb9b01a4f is described below

commit fdbb9b01a4fff447ae558e04355f4ebba5d3ef99
Author: Peter <[email protected]>
AuthorDate: Tue May 19 19:21:12 2026 +0900

    fix: pin compatible-with at the transport layer to keep ES 8 servers 
working (#66065)
    
    * fix(elasticsearch): pin compatible-with at the transport layer to keep ES 
8 servers working
    
    Since #64070 the provider depends on elasticsearch>=8.10,<10. A default
    install resolves to an elasticsearch>=9 Python client, which always
    negotiates 'compatible-with=9' on every request. Elasticsearch 8.x
    servers reject that with HTTP 400 media_type_header_exception, breaking
    remote task log ingestion and both ElasticsearchSQLHook and
    ElasticsearchPythonHook against ES 8 clusters.
    
    Add a [elasticsearch] es_compat_with config option that, when set to a
    major version string ('7'/'8'/'9'), wraps the client's transport
    perform_request so every outbound request carries
    'Accept: application/vnd.elasticsearch+json; compatible-with=<major>'
    (and the matching '+x-ndjson' form for bulk so streaming bodies still
    parse). The wrap is applied at every Elasticsearch client construction
    site in the provider:
    
      - ElasticsearchTaskHandler  (log/es_task_handler.py)
      - ElasticsearchRemoteLogIO  (log/es_task_handler.py)
      - ESConnection              (hooks/elasticsearch.py)
      - ElasticsearchPythonHook   (hooks/elasticsearch.py)
    
    When the option is unset, behavior is unchanged.
    
    Tests assert against what the transport actually sends, not the in-memory
    state of the client object. Setting client._headers (which is what
    client.options(headers=...) does) is not enough because elasticsearch-py
    re-applies its own per-API-method content-negotiation headers right
    before the request is sent — only the transport layer sees the final
    headers.
    
    Closes: https://github.com/apache/airflow/issues/66063
    Supersedes: https://github.com/apache/airflow/pull/66064
    
    * fixup(elasticsearch): tighten apply_compat_with — idempotent guard, 
simpler ct check, correct test assertion
    
    - idempotent guard: skip second wrap by checking transport.__dict__ for an
      existing instance attribute. Repeated apply_compat_with calls (e.g.
      hook reuse paths) are now true no-ops.
    - content-type check simplified: '"ndjson" in ct' already matches both
      'ndjson' and '+x-ndjson' so the redundant 'x-ndjson' branch is dropped.
    - unset-case test was using 'transport.perform_request is original' which
      would fail even when nothing was wrapped, because attribute access on a
      bound method produces a fresh wrapper object every time. Switched to
      inspecting transport.__dict__ for the 'perform_request' key, which
      precisely tracks whether the helper installed an instance override.
    - new test_apply_compat_with_is_idempotent asserts the guard above.
    
    * fixup(elasticsearch): mirror upstream selective mimetype rewrite
    
    The previous wrapper unconditionally overwrote the entire `Accept` header
    to `application/vnd.elasticsearch+json; compatible-with=<major>` whenever
    one was present. That is too aggressive: elasticsearch-py emits
    non-JSON `Accept` values for several APIs that still need to flow through
    the same transport. Notably:
    
    - `client.cat.help()` sends `Accept: text/plain`.
    - All other `client.cat.*` endpoints send `Accept: 
text/plain,application/json`.
    - Search-MVT endpoints send `Accept: application/vnd.mapbox-vector-tile`.
    
    After the previous wrap every one of those calls went on the wire as
    plain `application/vnd.elasticsearch+json; compatible-with=<N>`, silently
    turning cat responses into JSON for any operator using
    `ElasticsearchPythonHook.get_conn()` to call cat APIs.
    
    Mirror upstream's own `mimetype_header_to_compat` instead: only
    `application/(json|x-ndjson|vnd.mapbox-vector-tile)` parts of the header
    get the `compatible-with=<configured>` suffix, anything else is left
    verbatim. The regex also matches the already-rewritten
    `application/vnd.elasticsearch+<x>; compatible-with=<N>` form that
    elasticsearch-py 9.x ships before the transport sees the request, so the
    configured major actually replaces the client default major on the wire
    (verified with a Transport spy against elasticsearch-py 9.3.0).
    
    Two adjacent hardenings while we are in here:
    
    - Strip whitespace from the config value and reject anything that is not a
      positive integer string with `AirflowConfigException` at construction
      time, so a typo like `es_compat_with = 'v8'` fails fast in the worker
      startup log instead of returning a 400 storm per request.
    - Walk header keys case-insensitively, so a future `elastic_transport`
      that forwards PascalCase `Accept` / `Content-Type` keys cannot silently
      bypass the rewrite.
    
    Tests: add wire-level cases for cat APIs (`text/plain` preserved,
    `text/plain,application/...` partial rewrite), PascalCase headers,
    whitespace stripping, non-numeric major rejection, and a direct
    `conf.get -> None` branch (the existing parametrize folds into the
    provider yaml default `""` via `conf_vars`).
    
    * fixup(elasticsearch): import AirflowConfigException via common.compat.sdk
    
    The rest of this module already routes airflow imports through the
    `common.compat.sdk` shim (`conf` lives there), and the shim explicitly
    exports `AirflowConfigException` so the same provider build can target
    both Airflow 2 (`airflow.exceptions`) and Airflow 3 
(`airflow.sdk.exceptions`).
    Switch the new exception import to the same shim so we don't pin to
    `airflow.exceptions` and silently break the Airflow 3 import path.
    
    * fixup(elasticsearch): satisfy CI lint, mypy and project-structure tests
    
    CI on the latest `main` merge surfaced four failures, all mechanical and
    fixed in this commit:
    
    1. **Static checks** (ruff / ruff-format / autogen):
       - `_compat.py` — D205 docstring rule wants the summary line on its own
         line, both for the module docstring and for `apply_compat_with`'s
         docstring. Reformatted both.
       - `hooks/elasticsearch.py` — collapsed the multi-line
         `apply_compat_with(Elasticsearch(...))` call into a single line
         (now under the line-length cap thanks to the basic_auth tuple sitting
         inside the existing parens).
       - `tests/.../test__compat.py` — collapsed two over-wrapped expressions
         (`captured.append({...})` in the spy, and the
         `assert wire_capture[-1][...] == "..."` in
         `test_apply_compat_with_strips_whitespace_in_config`).
       - `get_provider_info.py` — the autogenerated mirror of `provider.yaml`
         was missing the new `es_compat_with` config option entry. Added it
         with the same description / version_added / type / example / default
         as the yaml.
    
    2. **MyPy providers** (`Cannot assign to a method [method-assign]`):
       - `transport.perform_request = perform_request` (instance-level
         assignment) is rejected by mypy because elastic_transport's
         `Transport.perform_request` is bound at the class. Switched to
         `setattr(transport, "perform_request", perform_request)`, which
         mypy accepts and which preserves the exact same runtime behaviour
         (the idempotency guard at the top of the function still inspects
         `transport.__dict__["perform_request"]`, so repeat calls remain
         no-ops).
    
    3. **Non-DB tests core** and **Low dep tests core**
       (`test_project_structure.py::test_providers_modules_should_have_tests`):
       - The structure check expects the test file for source `_compat.py`
         (note the leading underscore) to be named `test__compat.py` (two
         underscores: `test_` + `_compat`). Renamed the file from
         `test_compat.py` → `test__compat.py` via `git mv` so the rest of
         git history follows.
    
    Re-validated locally:
    - `ruff check` and `ruff format --check` pass on all four files.
    - mypy on `_compat.py` no longer reports the `method-assign` error
      (only an unrelated `airflow.__version__` attr-defined error from
      running mypy outside a real Airflow install — Airflow CI runs against
      an installed Airflow so this does not surface there).
    - Wire-level regression matrix re-run with elasticsearch-py 9.3.0 and
      the `setattr` variant: cat.help `text/plain` preserved, cat.indices
      partial rewrite preserved, search/bulk Accept and Content-Type
      rewritten to compat=8, idempotency guard still triggers, bad values
      rejected. 7/7 PASS.
    
    * Address review feedback: tolerant transport wrapper + clarified docs
    
    - Refactor apply_compat_with to use functools.wraps + *args, **kwargs so
      the wrapper survives future elastic_transport perform_request signature
      changes (new keyword-only params, reordered positionals) and preserves
      __name__/__doc__/__wrapped__ for introspection.
    
    - Extend the es_compat_with docs entry with explicit valid-value rules
      and a note that the fix is installed at the transport layer and
      therefore overrides elasticsearch-py's per-API-method header
      negotiation (constructor headers= does not work for this purpose).
    
    * Add 'misconfiguration' to spelling wordlist
    
    Used in providers/elasticsearch/docs/logging/index.rst to describe the
    fail-fast behavior when [elasticsearch] es_compat_with is set to an
    invalid value. The wordlist already contained 'misconfigured' but the
    noun form was missing, causing the --spellcheck-only docs build to fail.
    
    ---------
    
    Co-authored-by: Peter Cheon <[email protected]>
---
 docs/spelling_wordlist.txt                         |   1 +
 providers/elasticsearch/docs/changelog.rst         |  14 ++
 providers/elasticsearch/docs/logging/index.rst     |  42 ++++
 providers/elasticsearch/provider.yaml              |  14 ++
 .../src/airflow/providers/elasticsearch/_compat.py | 119 ++++++++++
 .../providers/elasticsearch/get_provider_info.py   |   7 +
 .../providers/elasticsearch/hooks/elasticsearch.py |   7 +-
 .../providers/elasticsearch/log/es_task_handler.py |   5 +-
 .../tests/unit/elasticsearch/test__compat.py       | 240 +++++++++++++++++++++
 9 files changed, 444 insertions(+), 5 deletions(-)

diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index b4460b21e75..26662d523a3 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1031,6 +1031,7 @@ milli
 millis
 milton
 minikube
+misconfiguration
 misconfigured
 Mixin
 mixin
diff --git a/providers/elasticsearch/docs/changelog.rst 
b/providers/elasticsearch/docs/changelog.rst
index 2209b6ff95b..3da8690238c 100644
--- a/providers/elasticsearch/docs/changelog.rst
+++ b/providers/elasticsearch/docs/changelog.rst
@@ -28,6 +28,20 @@ Changelog
 ---------
 
 
+A new ``[elasticsearch] es_compat_with`` config option lets operators pin
+the ``compatible-with`` HTTP content-negotiation level used by the
+Elasticsearch client. Since 6.5.1 the provider depends on
+``elasticsearch>=8.10,<10``, and a default install resolves to an
+``elasticsearch>=9`` client which unconditionally negotiates
+``compatible-with=9`` on every request. Elasticsearch 8.x servers reject
+that with HTTP 400 ``media_type_header_exception`` (regression introduced
+by #64070), breaking remote task log ingestion and the SQL/Python hooks
+against ES 8 clusters. Setting ``es_compat_with = "8"`` rewrites the
+client transport so every outbound request carries
+``compatible-with=8`` (and the matching ``+x-ndjson`` form for bulk
+requests), restoring compatibility without dropping ES 9 support. When
+unset, behavior is unchanged.
+
 6.5.3
 .....
 
diff --git a/providers/elasticsearch/docs/logging/index.rst 
b/providers/elasticsearch/docs/logging/index.rst
index 3254bf5406c..5cfae8d6230 100644
--- a/providers/elasticsearch/docs/logging/index.rst
+++ b/providers/elasticsearch/docs/logging/index.rst
@@ -93,6 +93,48 @@ Additionally, in the ``elasticsearch_configs`` section, you 
can pass any paramet
     api_key = "SOMEAPIKEY"
     verify_certs = True
 
+Pinning the ``compatible-with`` content-negotiation level
+'''''''''''''''''''''''''''''''''''''''''''''''''''''''''
+
+Since provider 6.5.1, the Elasticsearch dependency is 
``elasticsearch>=8.10,<10``,
+which means a default install resolves to an ``elasticsearch>=9`` Python 
client.
+That client unconditionally negotiates ``compatible-with=9`` on every request,
+which Elasticsearch 8.x servers reject with HTTP 400
+``media_type_header_exception``. Both the task log writer and the
+``ElasticsearchSQLHook`` / ``ElasticsearchPythonHook`` are affected.
+
+If you need to keep a single Airflow image compatible with an
+``elasticsearch<9`` server, set ``[elasticsearch] es_compat_with`` to the 
server
+major version. The provider then rewrites the client transport so every 
outbound
+request carries ``Accept`` / ``Content-Type:
+application/vnd.elasticsearch+json; compatible-with=<major>`` (and the matching
+``+x-ndjson`` form for bulk requests):
+
+.. code-block:: ini
+
+    [elasticsearch]
+    es_compat_with = 8
+
+Only a positive integer major version is accepted (``"7"``, ``"8"``, ``"9"``);
+any other value (e.g. ``"v8"``, ``"8.0"``) fails fast with an
+``AirflowConfigException`` at client construction time so the misconfiguration
+is obvious in the worker startup log instead of producing a per-request 400
+storm.
+
+.. note::
+
+   The fix is installed at the **transport layer** (a wrapper around
+   ``client.transport.perform_request``) and therefore overrides the
+   per-API-method ``Accept`` / ``Content-Type`` headers that elasticsearch-py
+   negotiates from its own client major. Constructor-level ``headers=`` on
+   ``Elasticsearch.__init__`` and the ``elasticsearch_configs`` section do
+   **not** work for this purpose — elasticsearch-py re-applies its own
+   ``compatible-with=<client_major>`` headers right before the request goes
+   out, after any constructor headers.
+
+When the option is unset the client behaves as before (negotiating its own
+major version).
+
 .. _elasticsearch-document-schema:
 
 Expected Elasticsearch document schema
diff --git a/providers/elasticsearch/provider.yaml 
b/providers/elasticsearch/provider.yaml
index 5fff75d12ef..822580e58c8 100644
--- a/providers/elasticsearch/provider.yaml
+++ b/providers/elasticsearch/provider.yaml
@@ -221,6 +221,20 @@ config:
         type: string
         example: ~
         default: "1000"
+      es_compat_with:
+        description: |
+          Pin the ``compatible-with`` HTTP content-negotiation level used by 
the
+          Elasticsearch client. Accepts a server major version string (e.g. 
``"7"``,
+          ``"8"``, ``"9"``). When unset, the elasticsearch-py client 
negotiates its
+          own major version, which makes an ``elasticsearch>=9`` client (the 
default
+          for fresh installs) incompatible with Elasticsearch 8.x servers — 
every
+          request is rejected with HTTP 400 ``media_type_header_exception``.
+          Setting this option keeps a single Airflow image compatible with both
+          ``elasticsearch<9`` and ``elasticsearch>=9`` servers.
+        version_added: 6.5.4
+        type: string
+        example: "8"
+        default: ""
   elasticsearch_configs:
     description: ~
     options:
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/_compat.py 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/_compat.py
new file mode 100644
index 00000000000..ca888db8a6f
--- /dev/null
+++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/_compat.py
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Helpers shared between the Elasticsearch hooks and log handler.
+
+Currently this exposes a single helper, :func:`apply_compat_with`, that lets 
the
+provider keep working against an Elasticsearch server whose major version does
+not match the installed ``elasticsearch`` Python client major. See the helper
+docstring for the regression context.
+"""
+
+from __future__ import annotations
+
+import functools
+import re
+from typing import TYPE_CHECKING
+
+from airflow.providers.common.compat.sdk import AirflowConfigException, conf
+
+if TYPE_CHECKING:
+    import elasticsearch
+
+
+# Matches the JSON / NDJSON / mapbox vector-tile mimetypes the 
``elasticsearch``
+# client negotiates, in either form: the raw ``application/json`` (what the
+# generated client code writes into ``__headers``) and the already-rewritten
+# ``application/vnd.elasticsearch+json; compatible-with=<N>`` (what
+# ``mimetype_header_to_compat`` in ``elasticsearch/_sync/client/_base.py``
+# emits before handing the request to the transport). Both forms must be
+# rewritten to ``compatible-with=<configured>``; anything else (notably
+# ``text/plain`` used by the cat APIs) is left intact, mirroring upstream's
+# selective substitution behaviour.
+_COMPAT_MIMETYPE_RE = re.compile(
+    
r"application/(?:vnd\.elasticsearch\+)?(json|x-ndjson|vnd\.mapbox-vector-tile)"
+    r"(?:\s*;\s*compatible-with=\d+)?"
+)
+_COMPAT_MAJOR_RE = re.compile(r"^\d+$")
+
+
+def apply_compat_with(client: elasticsearch.Elasticsearch) -> 
elasticsearch.Elasticsearch:
+    """
+    Pin the ``compatible-with`` HTTP content-negotiation level for ``client``.
+
+    The ``elasticsearch`` Python client always negotiates 
``compatible-with=<client_major>``
+    on every request; an Elasticsearch server with a different major version 
rejects
+    the request with HTTP 400 ``media_type_header_exception``. This is what 
happens
+    when an ``elasticsearch>=9`` client (the current default) talks to an
+    Elasticsearch 8.x server, which broke remote logging for ES 8 deployments
+    starting with provider 6.5.1.
+
+    When ``[elasticsearch] es_compat_with`` is set to a major version string
+    (e.g. ``"7"``, ``"8"``, ``"9"``) this helper wraps the client's transport
+    so every outbound request rewrites the ``compatible-with=<N>`` parameter on
+    the JSON / NDJSON / mapbox vector-tile parts of the ``Accept`` and
+    ``Content-Type`` headers. Non-JSON parts (notably ``text/plain`` used by
+    the cat APIs) are preserved verbatim, mirroring how elasticsearch-py's own
+    ``mimetype_header_to_compat`` handles content negotiation.
+
+    When the option is unset the client is returned unchanged and behaves
+    exactly as before.
+    """
+    raw = conf.get("elasticsearch", "es_compat_with", fallback=None)
+    compat = (raw or "").strip()
+    if not compat:
+        return client
+    if not _COMPAT_MAJOR_RE.match(compat):
+        raise AirflowConfigException(
+            "[elasticsearch] es_compat_with must be a positive integer major 
version "
+            f"(e.g. '7', '8', '9'); got {raw!r}."
+        )
+
+    transport = client.transport
+    if "perform_request" in transport.__dict__:
+        # Already wrapped on this transport instance — no-op so repeated calls
+        # to ``apply_compat_with`` (e.g. across hook reuse) stay idempotent.
+        return client
+
+    sub = rf"application/vnd.elasticsearch+\g<1>; compatible-with={compat}"
+    original_perform_request = transport.perform_request
+
+    # Accept ``*args, **kwargs`` so the wrapper survives future 
elastic_transport
+    # ``perform_request`` signature changes (new keyword-only params, reordered
+    # positionals). ``functools.wraps`` preserves the original ``__name__`` /
+    # ``__doc__`` / ``__wrapped__`` for tooling and introspection.
+    @functools.wraps(original_perform_request)
+    def perform_request(*args, **kwargs):  # type: ignore[no-untyped-def]
+        headers = kwargs.get("headers")
+        if not headers:
+            return original_perform_request(*args, **kwargs)
+        # Walk every key case-insensitively so a future elastic_transport that
+        # forwards PascalCase headers does not silently bypass the rewrite.
+        merged = dict(headers)
+        for key in list(merged):
+            if key.lower() in ("accept", "content-type") and merged[key]:
+                merged[key] = _COMPAT_MIMETYPE_RE.sub(sub, merged[key])
+        kwargs["headers"] = merged
+        return original_perform_request(*args, **kwargs)
+
+    # ``setattr`` instead of direct attribute assignment so mypy does not flag 
a
+    # ``method-assign`` error — we are *intentionally* shadowing the bound 
method
+    # at the instance level (the idempotency guard above checks the instance
+    # ``__dict__``).
+    setattr(transport, "perform_request", perform_request)
+    return client
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
index 2d357cecb19..b0853a98580 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/get_provider_info.py
@@ -151,6 +151,13 @@ def get_provider_info():
                         "example": None,
                         "default": "1000",
                     },
+                    "es_compat_with": {
+                        "description": 'Pin the ``compatible-with`` HTTP 
content-negotiation level used by the\nElasticsearch client. Accepts a server 
major version string (e.g. ``"7"``,\n``"8"``, ``"9"``). When unset, the 
elasticsearch-py client negotiates its\nown major version, which makes an 
``elasticsearch>=9`` client (the default\nfor fresh installs) incompatible with 
Elasticsearch 8.x servers — every\nrequest is rejected with HTTP 400 
``media_type_header_exception``.\nSetting this [...]
+                        "version_added": "6.5.4",
+                        "type": "string",
+                        "example": "8",
+                        "default": "",
+                    },
                 },
             },
             "elasticsearch_configs": {
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
index 4330740fd96..fa4895b37e4 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -28,6 +28,7 @@ from elasticsearch import Elasticsearch
 
 from airflow.providers.common.compat.sdk import BaseHook
 from airflow.providers.common.sql.hooks.sql import DbApiHook
+from airflow.providers.elasticsearch._compat import apply_compat_with
 
 if TYPE_CHECKING:
     from elastic_transport import ObjectApiResponse
@@ -171,9 +172,9 @@ class ESConnection:
         netloc = f"{host}:{port}"
         self.url = parse.urlunparse((scheme, netloc, "/", None, None, None))
         if user and password:
-            self.es = Elasticsearch(self.url, basic_auth=(user, password), 
**kwargs)
+            self.es = apply_compat_with(Elasticsearch(self.url, 
basic_auth=(user, password), **kwargs))
         else:
-            self.es = Elasticsearch(self.url, **kwargs)
+            self.es = apply_compat_with(Elasticsearch(self.url, **kwargs))
 
     def cursor(self) -> ElasticsearchSQLCursor:
         return ElasticsearchSQLCursor(self.es, **self.kwargs)
@@ -283,7 +284,7 @@ class ElasticsearchPythonHook(BaseHook):
 
     def _get_elastic_connection(self):
         """Return the Elasticsearch client."""
-        client = Elasticsearch(self.hosts, **self.es_conn_args)
+        client = apply_compat_with(Elasticsearch(self.hosts, 
**self.es_conn_args))
 
         return client
 
diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
index eb531452df8..402d64e5bf1 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py
@@ -43,6 +43,7 @@ from elasticsearch.exceptions import NotFoundError
 import airflow.logging_config as alc
 from airflow.models.dagrun import DagRun
 from airflow.providers.common.compat.sdk import conf
+from airflow.providers.elasticsearch._compat import apply_compat_with
 from airflow.providers.elasticsearch.log.es_json_formatter import 
ElasticsearchJSONFormatter
 from airflow.providers.elasticsearch.log.es_response import 
ElasticSearchResponse, Hit, resolve_nested
 from airflow.providers.elasticsearch.version_compat import AIRFLOW_V_3_0_PLUS, 
AIRFLOW_V_3_2_PLUS
@@ -269,7 +270,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, 
ExternalLoggingMixin, LoggingMix
         )
         self.closed = False
 
-        self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+        self.client = apply_compat_with(elasticsearch.Elasticsearch(self.host, 
**es_kwargs))
         # in airflow.cfg, host of elasticsearch has to be 
http://dockerhostXxxx:9200
 
         self.frontend = frontend
@@ -651,7 +652,7 @@ class ElasticsearchRemoteLogIO(LoggingMixin):  # noqa: D101
 
     def __attrs_post_init__(self):
         es_kwargs = get_es_kwargs_from_config()
-        self.client = elasticsearch.Elasticsearch(self.host, **es_kwargs)
+        self.client = apply_compat_with(elasticsearch.Elasticsearch(self.host, 
**es_kwargs))
         self.index_patterns_callable = conf.get("elasticsearch", 
"index_patterns_callable", fallback="")
         self.PAGE = 0
         self.MAX_LINE_PER_PAGE = conf.getint("elasticsearch", 
"max_lines_per_page", fallback=1000)
diff --git a/providers/elasticsearch/tests/unit/elasticsearch/test__compat.py 
b/providers/elasticsearch/tests/unit/elasticsearch/test__compat.py
new file mode 100644
index 00000000000..7f81f9b92f0
--- /dev/null
+++ b/providers/elasticsearch/tests/unit/elasticsearch/test__compat.py
@@ -0,0 +1,240 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Wire-level tests for 
``airflow.providers.elasticsearch._compat.apply_compat_with``.
+
+These tests intercept ``elastic_transport.Transport.perform_request`` so they
+observe the exact ``Accept`` / ``Content-Type`` headers the helper produces.
+Asserting on ``client._headers`` (or any other in-memory state on the client)
+would not be enough: the elasticsearch-py client always re-applies its own
+per-API-method content-negotiation headers right before the request goes out,
+which is the very behaviour this helper has to override. Only what the
+``Transport`` sees on the wire matters.
+"""
+
+from __future__ import annotations
+
+import contextlib
+
+import pytest
+from elastic_transport import Transport
+from elasticsearch import Elasticsearch
+
+from airflow.providers.common.compat.sdk import AirflowConfigException
+from airflow.providers.elasticsearch._compat import apply_compat_with
+
+from tests_common.test_utils.config import conf_vars
+
+
+def _trigger_calls(client: Elasticsearch) -> None:
+    """Drive ``search``, ``info`` and ``bulk`` against the spy transport.
+
+    Each call hits the spy and raises; we swallow that and rely on the spy's
+    ``captured`` list to make assertions.
+    """
+    for action in (
+        lambda: client.search(index="airflow-logs", query={"match_all": {}}),
+        lambda: client.info(),
+        lambda: client.bulk(operations=[{"index": {"_index": "x"}}, {"hello": 
"world"}]),
+    ):
+        with contextlib.suppress(RuntimeError):
+            action()
+
+
[email protected]
+def wire_capture(monkeypatch):
+    """Replace ``Transport.perform_request`` with a recording spy.
+
+    The spy is installed at the *class* level (not on an instance) so it is
+    picked up by both the original transport and by the wrapper produced by
+    :func:`apply_compat_with`. The wrapper resolves the original
+    ``perform_request`` at wrap time, so the spy must already be in place when
+    ``apply_compat_with`` runs.
+    """
+    captured: list[dict] = []
+
+    def spy(self, method, target, *, body=None, headers=None, **kwargs):
+        captured.append({"method": method, "target": target, "headers": 
dict(headers or {})})
+        raise RuntimeError("captured")
+
+    monkeypatch.setattr(Transport, "perform_request", spy)
+    return captured
+
+
[email protected]("unset_value", ["", None])
+def test_apply_compat_with_unset_does_not_wrap_transport(unset_value):
+    """When the option is unset the helper returns the client untouched.
+
+    The wrap installs ``perform_request`` as an instance attribute on the
+    transport, so the most precise way to assert the helper is a no-op is to
+    check that no such instance attribute was set (i.e. lookup still resolves
+    to the class method). Identity (``is``) comparison on
+    ``transport.perform_request`` would not work even in the no-op case
+    because attribute access on a bound method produces a fresh wrapper each
+    time.
+
+    Note: ``conf_vars`` removes the override when the value is ``None``, so
+    both parametrized cases ultimately resolve to the provider yaml default
+    (``""``). The parametrize is kept to document that callers passing either
+    sentinel get the no-op path; the actual ``None`` branch in the helper is
+    covered by ``test_apply_compat_with_unset_via_missing_conf`` below.
+    """
+    client = Elasticsearch("http://localhost:9200";)
+    assert "perform_request" not in client.transport.__dict__
+    with conf_vars({("elasticsearch", "es_compat_with"): unset_value}):
+        same = apply_compat_with(client)
+    assert same is client
+    assert "perform_request" not in client.transport.__dict__
+
+
+def test_apply_compat_with_unset_via_missing_conf(monkeypatch):
+    """Cover the ``conf.get`` returning ``None`` branch directly."""
+    from airflow.providers.elasticsearch import _compat
+
+    monkeypatch.setattr(_compat.conf, "get", lambda *args, **kwargs: None)
+    client = Elasticsearch("http://localhost:9200";)
+    assert apply_compat_with(client) is client
+    assert "perform_request" not in client.transport.__dict__
+
+
+def test_apply_compat_with_pins_compatible_with_8(wire_capture):
+    """With ``es_compat_with = "8"`` every outbound call carries 
``compatible-with=8``."""
+    with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+        client = apply_compat_with(Elasticsearch("http://localhost:9200";))
+
+    _trigger_calls(client)
+
+    assert {c["method"] for c in wire_capture} == {"POST", "GET", "PUT"}
+    expected_json = "application/vnd.elasticsearch+json; compatible-with=8"
+    expected_ndjson = "application/vnd.elasticsearch+x-ndjson; 
compatible-with=8"
+
+    by_method = {c["method"]: c for c in wire_capture}
+    assert by_method["POST"]["headers"]["accept"] == expected_json
+    assert by_method["POST"]["headers"]["content-type"] == expected_json
+    assert by_method["GET"]["headers"]["accept"] == expected_json
+    # ``info()`` does not send a body, so content-type is absent on the wire
+    assert by_method["GET"]["headers"].get("content-type") in (None, "")
+    assert by_method["PUT"]["headers"]["accept"] == expected_json
+    # bulk preserves the ndjson form so the server can stream the body
+    assert by_method["PUT"]["headers"]["content-type"] == expected_ndjson
+
+
+def test_apply_compat_with_pins_compatible_with_7(wire_capture):
+    """The helper accepts arbitrary major version strings, not just ``"8"``."""
+    with conf_vars({("elasticsearch", "es_compat_with"): "7"}):
+        client = apply_compat_with(Elasticsearch("http://localhost:9200";))
+
+    _trigger_calls(client)
+    assert wire_capture, "spy should have captured at least one call"
+    assert all(
+        c["headers"]["accept"] == "application/vnd.elasticsearch+json; 
compatible-with=7"
+        for c in wire_capture
+    )
+
+
+def test_apply_compat_with_preserves_text_plain_for_cat_apis(wire_capture):
+    """Cat APIs send ``Accept: text/plain[,application/json]``; the wrapper 
must
+    preserve the ``text/plain`` part. Earlier revisions of the helper 
unconditionally
+    rewrote ``accept`` to ``application/vnd.elasticsearch+json; 
compatible-with=N``,
+    which silently turned every ``cat.*`` response into JSON instead of plain 
text.
+
+    We mirror elasticsearch-py's own ``mimetype_header_to_compat`` (only
+    ``application/(json|x-ndjson|vnd.mapbox-vector-tile)`` parts get the
+    ``compatible-with`` suffix), so this test fails fast if anyone reverts to 
the
+    blanket overwrite.
+    """
+    with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+        client = apply_compat_with(Elasticsearch("http://localhost:9200";))
+
+    for action in (lambda: client.cat.help(), lambda: client.cat.indices()):
+        with contextlib.suppress(RuntimeError):
+            action()
+
+    accepts = [c["headers"].get("accept") for c in wire_capture]
+    # ``cat.help`` ships ``text/plain`` only; it must come through verbatim.
+    assert "text/plain" in accepts, accepts
+    # ``cat.indices`` ships ``text/plain,application/json``; the JSON half gets
+    # the ``compatible-with=8`` suffix, the text/plain half stays put.
+    assert any(
+        a and a.startswith("text/plain,application/vnd.elasticsearch+json; 
compatible-with=8")
+        for a in accepts
+    ), accepts
+
+
+def test_apply_compat_with_handles_pascal_case_headers(monkeypatch):
+    """Defensive: if ``elastic_transport`` ever forwards PascalCase header 
keys,
+    the rewrite must still apply (a lowercase-only ``dict.get`` would silently
+    no-op and let ``compatible-with=<client_major>`` ship to the server).
+    """
+    seen: dict = {}
+
+    def spy(self, method, target, *, body=None, headers=None, **kwargs):
+        seen["headers"] = dict(headers or {})
+        raise RuntimeError("captured")
+
+    monkeypatch.setattr(Transport, "perform_request", spy)
+
+    with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+        client = apply_compat_with(Elasticsearch("http://localhost:9200";))
+
+    # Drive the wrapper with PascalCase keys directly — bypassing the
+    # ``_BaseClient.perform_request`` normalization.
+    with contextlib.suppress(RuntimeError):
+        client.transport.perform_request(
+            "GET",
+            "/",
+            headers={"Accept": "application/json", "Content-Type": 
"application/json"},
+        )
+
+    assert seen["headers"]["Accept"] == "application/vnd.elasticsearch+json; 
compatible-with=8"
+    assert seen["headers"]["Content-Type"] == 
"application/vnd.elasticsearch+json; compatible-with=8"
+
+
+def test_apply_compat_with_strips_whitespace_in_config(wire_capture):
+    """Operators occasionally write ``es_compat_with = " 8"``; the helper must
+    strip whitespace before interpolating into the wire header, otherwise the
+    server returns 400 and the helper fails open in a confusing way.
+    """
+    with conf_vars({("elasticsearch", "es_compat_with"): " 8 "}):
+        client = apply_compat_with(Elasticsearch("http://localhost:9200";))
+
+    with contextlib.suppress(RuntimeError):
+        client.search(index="airflow-logs", query={"match_all": {}})
+
+    assert wire_capture[-1]["headers"]["accept"] == 
"application/vnd.elasticsearch+json; compatible-with=8"
+
+
[email protected]("bad_value", ["v8", "8.0", "abc", "8;9"])
+def test_apply_compat_with_rejects_non_numeric_major(bad_value):
+    """A non-numeric ``es_compat_with`` would otherwise produce malformed wire
+    headers (``compatible-with=v8``) and a per-request 400 storm. Fail fast at
+    construction time with a config exception so the misconfiguration is
+    obvious in the worker startup log.
+    """
+    with conf_vars({("elasticsearch", "es_compat_with"): bad_value}):
+        with pytest.raises(AirflowConfigException, match="es_compat_with"):
+            apply_compat_with(Elasticsearch("http://localhost:9200";))
+
+
+def test_apply_compat_with_is_idempotent():
+    """Calling ``apply_compat_with`` twice on the same client only wraps 
once."""
+    with conf_vars({("elasticsearch", "es_compat_with"): "8"}):
+        client = apply_compat_with(Elasticsearch("http://localhost:9200";))
+        first_wrapper = client.transport.__dict__["perform_request"]
+        apply_compat_with(client)
+        second_wrapper = client.transport.__dict__["perform_request"]
+    assert first_wrapper is second_wrapper


Reply via email to