This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 97839f7b0a8 Replace pickle with json serialization for http triggers 
(#61662)
97839f7b0a8 is described below

commit 97839f7b0a8ae66d6079bb7fad5a363068f61617
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Feb 12 03:37:53 2026 +0530

    Replace pickle with json serialization for http triggers (#61662)
    
    * Replace pickle with json serialization for http triggers
    
    * fixing Ci tetsts
    
    * no need to handle compat
    
    * adding in changelog
    
    * adding in changelog and updating to 6.0.0
    
    * adding in changelog and updating to 6.0.0
---
 providers/http/docs/changelog.rst                  | 14 ++++++
 providers/http/docs/index.rst                      |  6 +--
 providers/http/provider.yaml                       |  1 +
 providers/http/pyproject.toml                      |  6 +--
 .../http/src/airflow/providers/http/__init__.py    |  2 +-
 .../src/airflow/providers/http/operators/http.py   |  6 +--
 .../src/airflow/providers/http/triggers/http.py    | 50 ++++++++++++++++++++--
 .../http/tests/unit/http/operators/test_http.py    | 24 +++++++++--
 .../http/tests/unit/http/triggers/test_http.py     | 16 +++----
 9 files changed, 100 insertions(+), 25 deletions(-)

diff --git a/providers/http/docs/changelog.rst 
b/providers/http/docs/changelog.rst
index b82db282641..497cb2d0929 100644
--- a/providers/http/docs/changelog.rst
+++ b/providers/http/docs/changelog.rst
@@ -27,6 +27,20 @@
 Changelog
 ---------
 
+6.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+.. warning::
+  The HTTP provider now uses JSON-based serialization for HTTP responses in 
deferred tasks
+  instead of pickle. Deferred HTTP tasks from previous provider versions will 
fail with a
+  ``TypeError`` after upgrade.
+
+  Before upgrading, ensure all HTTP tasks with ``deferrable=True`` that are 
currently in
+  ``deferred`` state have completed or been cleared.
+
 5.6.4
 .....
 
diff --git a/providers/http/docs/index.rst b/providers/http/docs/index.rst
index 82127b2cbae..20bd0f9951d 100644
--- a/providers/http/docs/index.rst
+++ b/providers/http/docs/index.rst
@@ -78,7 +78,7 @@ apache-airflow-providers-http package
 `Hypertext Transfer Protocol (HTTP) <https://www.w3.org/Protocols/>`__
 
 
-Release: 5.6.4
+Release: 6.0.0
 
 Provider package
 ----------------
@@ -134,5 +134,5 @@ Downloading official packages
 You can download officially released packages and verify their checksums and 
signatures from the
 `Official Apache Download site 
<https://downloads.apache.org/airflow/providers/>`_
 
-* `The apache-airflow-providers-http 5.6.4 sdist package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4.tar.gz>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4.tar.gz.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4.tar.gz.sha512>`__)
-* `The apache-airflow-providers-http 5.6.4 wheel package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4-py3-none-any.whl>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4-py3-none-any.whl.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4-py3-none-any.whl.sha512>`__)
+* `The apache-airflow-providers-http 6.0.0 sdist package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0.tar.gz>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0.tar.gz.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0.tar.gz.sha512>`__)
+* `The apache-airflow-providers-http 6.0.0 wheel package 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0-py3-none-any.whl>`_
 (`asc 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0-py3-none-any.whl.asc>`__,
 `sha512 
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0-py3-none-any.whl.sha512>`__)
diff --git a/providers/http/provider.yaml b/providers/http/provider.yaml
index 371886af80f..a5ad9baa1db 100644
--- a/providers/http/provider.yaml
+++ b/providers/http/provider.yaml
@@ -28,6 +28,7 @@ source-date-epoch: 1769537221
 # In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider 
have
 # to be done in the same PR
 versions:
+  - 6.0.0
   - 5.6.4
   - 5.6.3
   - 5.6.2
diff --git a/providers/http/pyproject.toml b/providers/http/pyproject.toml
index 47dc55c7d26..0260e850eb4 100644
--- a/providers/http/pyproject.toml
+++ b/providers/http/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
 
 [project]
 name = "apache-airflow-providers-http"
-version = "5.6.4"
+version = "6.0.0"
 description = "Provider package apache-airflow-providers-http for Apache 
Airflow"
 readme = "README.rst"
 license = "Apache-2.0"
@@ -103,8 +103,8 @@ apache-airflow-providers-common-sql = {workspace = true}
 apache-airflow-providers-standard = {workspace = true}
 
 [project.urls]
-"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-http/5.6.4";
-"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-http/5.6.4/changelog.html";
+"Documentation" = 
"https://airflow.apache.org/docs/apache-airflow-providers-http/6.0.0";
+"Changelog" = 
"https://airflow.apache.org/docs/apache-airflow-providers-http/6.0.0/changelog.html";
 "Bug Tracker" = "https://github.com/apache/airflow/issues";
 "Source Code" = "https://github.com/apache/airflow";
 "Slack Chat" = "https://s.apache.org/airflow-slack";
diff --git a/providers/http/src/airflow/providers/http/__init__.py 
b/providers/http/src/airflow/providers/http/__init__.py
index c6da94ab750..21b720ad23f 100644
--- a/providers/http/src/airflow/providers/http/__init__.py
+++ b/providers/http/src/airflow/providers/http/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
 
 __all__ = ["__version__"]
 
-__version__ = "5.6.4"
+__version__ = "6.0.0"
 
 if 
packaging.version.parse(packaging.version.parse(airflow_version).base_version) 
< packaging.version.parse(
     "2.11.0"
diff --git a/providers/http/src/airflow/providers/http/operators/http.py 
b/providers/http/src/airflow/providers/http/operators/http.py
index d349b4228a1..1c5da7688e5 100644
--- a/providers/http/src/airflow/providers/http/operators/http.py
+++ b/providers/http/src/airflow/providers/http/operators/http.py
@@ -17,8 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import base64
-import pickle
 from collections.abc import Callable, Sequence
 from typing import TYPE_CHECKING, Any
 
@@ -26,7 +24,7 @@ from aiohttp import BasicAuth
 from requests import Response
 
 from airflow.providers.common.compat.sdk import AirflowException, BaseHook, 
BaseOperator, conf
-from airflow.providers.http.triggers.http import HttpTrigger, 
serialize_auth_type
+from airflow.providers.http.triggers.http import HttpResponseSerializer, 
HttpTrigger, serialize_auth_type
 from airflow.utils.helpers import merge_dicts
 
 if TYPE_CHECKING:
@@ -286,7 +284,7 @@ class HttpOperator(BaseOperator):
         Relies on trigger to throw an exception, otherwise it assumes 
execution was successful.
         """
         if event["status"] == "success":
-            response = 
pickle.loads(base64.standard_b64decode(event["response"]))
+            response = HttpResponseSerializer.deserialize(event["response"])
 
             self.paginate_async(context=context, response=response, 
previous_responses=paginated_responses)
             return self.process_response(context=context, response=response)
diff --git a/providers/http/src/airflow/providers/http/triggers/http.py 
b/providers/http/src/airflow/providers/http/triggers/http.py
index 4323f0621fb..ec77d2634ee 100644
--- a/providers/http/src/airflow/providers/http/triggers/http.py
+++ b/providers/http/src/airflow/providers/http/triggers/http.py
@@ -20,7 +20,6 @@ import asyncio
 import base64
 import importlib
 import inspect
-import pickle
 import sys
 from collections.abc import AsyncIterator
 from importlib import import_module
@@ -61,6 +60,51 @@ def deserialize_auth_type(path: str | None) -> type | None:
     return getattr(import_module(module_path), cls_name)
 
 
+class HttpResponseSerializer:
+    """Serializer for requests.Response objects used in deferred HTTP tasks."""
+
+    @staticmethod
+    def serialize(response: requests.Response) -> dict[str, Any]:
+        """Convert a requests.Response object to a JSON serializable 
dictionary."""
+        return {
+            "status_code": response.status_code,
+            "headers": dict(response.headers),
+            "content": 
base64.standard_b64encode(response.content).decode("ascii"),
+            "url": response.url,
+            "reason": response.reason,
+            "encoding": response.encoding,
+            "cookies": {k: v for k, v in response.cookies.items()},
+            "history": [HttpResponseSerializer.serialize(h) for h in 
response.history],
+        }
+
+    @staticmethod
+    def deserialize(data: dict[str, Any] | str) -> requests.Response:
+        """Reconstruct a requests.Response object from serialized data."""
+        if isinstance(data, str):
+            raise TypeError("Response data must be a dict, got str")
+
+        if not isinstance(data, dict):
+            raise TypeError(f"Expected dict, got {type(data).__name__}")
+
+        response = requests.Response()
+        response.status_code = data["status_code"]
+        response.headers = CaseInsensitiveDict(data["headers"])
+        response._content = base64.standard_b64decode(data["content"])
+        response.url = data["url"]
+        response.reason = data.get("reason", "")
+        response.encoding = data.get("encoding")
+
+        cookies = RequestsCookieJar()
+        for name, value in data.get("cookies", {}).items():
+            cookies.set(name, str(value))
+        response.cookies = cookies
+
+        if data.get("history"):
+            response.history = [HttpResponseSerializer.deserialize(hist) for 
hist in data["history"]]
+
+        return response
+
+
 class HttpTrigger(BaseTrigger):
     """
     HttpTrigger run on the trigger worker.
@@ -121,7 +165,7 @@ class HttpTrigger(BaseTrigger):
             yield TriggerEvent(
                 {
                     "status": "success",
-                    "response": 
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+                    "response": HttpResponseSerializer.serialize(response),
                 }
             )
         except Exception as e:
@@ -301,7 +345,7 @@ class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
             yield TriggerEvent(
                 {
                     "status": "success",
-                    "response": 
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+                    "response": HttpResponseSerializer.serialize(response),
                 }
             )
         except Exception as e:
diff --git a/providers/http/tests/unit/http/operators/test_http.py 
b/providers/http/tests/unit/http/operators/test_http.py
index 09fae893bbf..50b66e5f612 100644
--- a/providers/http/tests/unit/http/operators/test_http.py
+++ b/providers/http/tests/unit/http/operators/test_http.py
@@ -36,7 +36,7 @@ from airflow.models import Connection
 from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
 from airflow.providers.http.hooks.http import HttpHook
 from airflow.providers.http.operators.http import HttpOperator
-from airflow.providers.http.triggers.http import HttpTrigger, 
serialize_auth_type
+from airflow.providers.http.triggers.http import HttpResponseSerializer, 
HttpTrigger, serialize_auth_type
 
 
 @mock.patch.dict("os.environ", 
AIRFLOW_CONN_HTTP_EXAMPLE="http://www.example.com";)
@@ -124,11 +124,29 @@ class TestHttpOperator:
             context={},
             event={
                 "status": "success",
-                "response": 
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+                "response": HttpResponseSerializer.serialize(response),
             },
         )
         assert result == "content"
 
+    def test_async_execute_legacy_pickle_format_raise_error(self):
+        """Test error raised with legacy pickle format."""
+        operator = HttpOperator(
+            task_id="test_HTTP_op",
+            deferrable=True,
+        )
+        response = Response()
+        response._content = b"content"
+
+        with pytest.raises(TypeError, match="Response data must be a dict, got 
str"):
+            _ = operator.execute_complete(
+                context={},
+                event={
+                    "status": "success",
+                    "response": 
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+                },
+            )
+
     @pytest.mark.parametrize(
         (
             "data",
@@ -223,7 +241,7 @@ class TestHttpOperator:
                 context={},
                 event={
                     "status": "success",
-                    "response": 
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+                    "response": HttpResponseSerializer.serialize(response),
                 },
             )
 
diff --git a/providers/http/tests/unit/http/triggers/test_http.py 
b/providers/http/tests/unit/http/triggers/test_http.py
index 0e47c5559c1..e3338674af4 100644
--- a/providers/http/tests/unit/http/triggers/test_http.py
+++ b/providers/http/tests/unit/http/triggers/test_http.py
@@ -17,8 +17,6 @@
 # under the License.
 from __future__ import annotations
 
-import base64
-import pickle
 from asyncio import Future
 from http.cookies import SimpleCookie
 from typing import Any
@@ -31,7 +29,12 @@ from requests.structures import CaseInsensitiveDict
 from yarl import URL
 
 from airflow.models import Connection
-from airflow.providers.http.triggers.http import HttpEventTrigger, 
HttpSensorTrigger, HttpTrigger
+from airflow.providers.http.triggers.http import (
+    HttpEventTrigger,
+    HttpResponseSerializer,
+    HttpSensorTrigger,
+    HttpTrigger,
+)
 from airflow.triggers.base import TriggerEvent
 
 HTTP_PATH = "airflow.providers.http.triggers.http.{}"
@@ -144,10 +147,7 @@ class TestHttpTrigger:
         generator = trigger.run()
         actual = await generator.asend(None)
         assert actual == TriggerEvent(
-            {
-                "status": "success",
-                "response": 
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
-            }
+            {"status": "success", "response": 
HttpResponseSerializer.serialize(response)}
         )
 
     @pytest.mark.asyncio
@@ -252,7 +252,7 @@ class TestHttpEventTrigger:
         assert actual == TriggerEvent(
             {
                 "status": "success",
-                "response": 
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+                "response": HttpResponseSerializer.serialize(response),
             }
         )
         assert mock_hook.return_value.run.call_count == 2

Reply via email to