This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 97839f7b0a8 Replace pickle with json serialization for http triggers
(#61662)
97839f7b0a8 is described below
commit 97839f7b0a8ae66d6079bb7fad5a363068f61617
Author: Amogh Desai <[email protected]>
AuthorDate: Thu Feb 12 03:37:53 2026 +0530
Replace pickle with json serialization for http triggers (#61662)
* Replace pickle with json serialization for http triggers
* fixing Ci tetsts
* no need to handle compat
* adding in changelog
* adding in changelog and updating to 6.0.0
* adding in changelog and updating to 6.0.0
---
providers/http/docs/changelog.rst | 14 ++++++
providers/http/docs/index.rst | 6 +--
providers/http/provider.yaml | 1 +
providers/http/pyproject.toml | 6 +--
.../http/src/airflow/providers/http/__init__.py | 2 +-
.../src/airflow/providers/http/operators/http.py | 6 +--
.../src/airflow/providers/http/triggers/http.py | 50 ++++++++++++++++++++--
.../http/tests/unit/http/operators/test_http.py | 24 +++++++++--
.../http/tests/unit/http/triggers/test_http.py | 16 +++----
9 files changed, 100 insertions(+), 25 deletions(-)
diff --git a/providers/http/docs/changelog.rst
b/providers/http/docs/changelog.rst
index b82db282641..497cb2d0929 100644
--- a/providers/http/docs/changelog.rst
+++ b/providers/http/docs/changelog.rst
@@ -27,6 +27,20 @@
Changelog
---------
+6.0.0
+.....
+
+Breaking changes
+~~~~~~~~~~~~~~~~
+
+.. warning::
+ The HTTP provider now uses JSON-based serialization for HTTP responses in
deferred tasks
+ instead of pickle. Deferred HTTP tasks from previous provider versions will
fail with a
+ ``TypeError`` after upgrade.
+
+ Before upgrading, ensure all HTTP tasks with ``deferrable=True`` that are
currently in
+ ``deferred`` state have completed or been cleared.
+
5.6.4
.....
diff --git a/providers/http/docs/index.rst b/providers/http/docs/index.rst
index 82127b2cbae..20bd0f9951d 100644
--- a/providers/http/docs/index.rst
+++ b/providers/http/docs/index.rst
@@ -78,7 +78,7 @@ apache-airflow-providers-http package
`Hypertext Transfer Protocol (HTTP) <https://www.w3.org/Protocols/>`__
-Release: 5.6.4
+Release: 6.0.0
Provider package
----------------
@@ -134,5 +134,5 @@ Downloading official packages
You can download officially released packages and verify their checksums and
signatures from the
`Official Apache Download site
<https://downloads.apache.org/airflow/providers/>`_
-* `The apache-airflow-providers-http 5.6.4 sdist package
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4.tar.gz>`_
(`asc
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4.tar.gz.asc>`__,
`sha512
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4.tar.gz.sha512>`__)
-* `The apache-airflow-providers-http 5.6.4 wheel package
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4-py3-none-any.whl>`_
(`asc
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4-py3-none-any.whl.asc>`__,
`sha512
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-5.6.4-py3-none-any.whl.sha512>`__)
+* `The apache-airflow-providers-http 6.0.0 sdist package
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0.tar.gz>`_
(`asc
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0.tar.gz.asc>`__,
`sha512
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0.tar.gz.sha512>`__)
+* `The apache-airflow-providers-http 6.0.0 wheel package
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0-py3-none-any.whl>`_
(`asc
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0-py3-none-any.whl.asc>`__,
`sha512
<https://downloads.apache.org/airflow/providers/apache_airflow_providers_http-6.0.0-py3-none-any.whl.sha512>`__)
diff --git a/providers/http/provider.yaml b/providers/http/provider.yaml
index 371886af80f..a5ad9baa1db 100644
--- a/providers/http/provider.yaml
+++ b/providers/http/provider.yaml
@@ -28,6 +28,7 @@ source-date-epoch: 1769537221
# In such case adding >= NEW_VERSION and bumping to NEW_VERSION in a provider
have
# to be done in the same PR
versions:
+ - 6.0.0
- 5.6.4
- 5.6.3
- 5.6.2
diff --git a/providers/http/pyproject.toml b/providers/http/pyproject.toml
index 47dc55c7d26..0260e850eb4 100644
--- a/providers/http/pyproject.toml
+++ b/providers/http/pyproject.toml
@@ -25,7 +25,7 @@ build-backend = "flit_core.buildapi"
[project]
name = "apache-airflow-providers-http"
-version = "5.6.4"
+version = "6.0.0"
description = "Provider package apache-airflow-providers-http for Apache
Airflow"
readme = "README.rst"
license = "Apache-2.0"
@@ -103,8 +103,8 @@ apache-airflow-providers-common-sql = {workspace = true}
apache-airflow-providers-standard = {workspace = true}
[project.urls]
-"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-http/5.6.4"
-"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-http/5.6.4/changelog.html"
+"Documentation" =
"https://airflow.apache.org/docs/apache-airflow-providers-http/6.0.0"
+"Changelog" =
"https://airflow.apache.org/docs/apache-airflow-providers-http/6.0.0/changelog.html"
"Bug Tracker" = "https://github.com/apache/airflow/issues"
"Source Code" = "https://github.com/apache/airflow"
"Slack Chat" = "https://s.apache.org/airflow-slack"
diff --git a/providers/http/src/airflow/providers/http/__init__.py
b/providers/http/src/airflow/providers/http/__init__.py
index c6da94ab750..21b720ad23f 100644
--- a/providers/http/src/airflow/providers/http/__init__.py
+++ b/providers/http/src/airflow/providers/http/__init__.py
@@ -29,7 +29,7 @@ from airflow import __version__ as airflow_version
__all__ = ["__version__"]
-__version__ = "5.6.4"
+__version__ = "6.0.0"
if
packaging.version.parse(packaging.version.parse(airflow_version).base_version)
< packaging.version.parse(
"2.11.0"
diff --git a/providers/http/src/airflow/providers/http/operators/http.py
b/providers/http/src/airflow/providers/http/operators/http.py
index d349b4228a1..1c5da7688e5 100644
--- a/providers/http/src/airflow/providers/http/operators/http.py
+++ b/providers/http/src/airflow/providers/http/operators/http.py
@@ -17,8 +17,6 @@
# under the License.
from __future__ import annotations
-import base64
-import pickle
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any
@@ -26,7 +24,7 @@ from aiohttp import BasicAuth
from requests import Response
from airflow.providers.common.compat.sdk import AirflowException, BaseHook,
BaseOperator, conf
-from airflow.providers.http.triggers.http import HttpTrigger,
serialize_auth_type
+from airflow.providers.http.triggers.http import HttpResponseSerializer,
HttpTrigger, serialize_auth_type
from airflow.utils.helpers import merge_dicts
if TYPE_CHECKING:
@@ -286,7 +284,7 @@ class HttpOperator(BaseOperator):
Relies on trigger to throw an exception, otherwise it assumes
execution was successful.
"""
if event["status"] == "success":
- response =
pickle.loads(base64.standard_b64decode(event["response"]))
+ response = HttpResponseSerializer.deserialize(event["response"])
self.paginate_async(context=context, response=response,
previous_responses=paginated_responses)
return self.process_response(context=context, response=response)
diff --git a/providers/http/src/airflow/providers/http/triggers/http.py
b/providers/http/src/airflow/providers/http/triggers/http.py
index 4323f0621fb..ec77d2634ee 100644
--- a/providers/http/src/airflow/providers/http/triggers/http.py
+++ b/providers/http/src/airflow/providers/http/triggers/http.py
@@ -20,7 +20,6 @@ import asyncio
import base64
import importlib
import inspect
-import pickle
import sys
from collections.abc import AsyncIterator
from importlib import import_module
@@ -61,6 +60,51 @@ def deserialize_auth_type(path: str | None) -> type | None:
return getattr(import_module(module_path), cls_name)
+class HttpResponseSerializer:
+ """Serializer for requests.Response objects used in deferred HTTP tasks."""
+
+ @staticmethod
+ def serialize(response: requests.Response) -> dict[str, Any]:
+ """Convert a requests.Response object to a JSON serializable
dictionary."""
+ return {
+ "status_code": response.status_code,
+ "headers": dict(response.headers),
+ "content":
base64.standard_b64encode(response.content).decode("ascii"),
+ "url": response.url,
+ "reason": response.reason,
+ "encoding": response.encoding,
+ "cookies": {k: v for k, v in response.cookies.items()},
+ "history": [HttpResponseSerializer.serialize(h) for h in
response.history],
+ }
+
+ @staticmethod
+ def deserialize(data: dict[str, Any] | str) -> requests.Response:
+ """Reconstruct a requests.Response object from serialized data."""
+ if isinstance(data, str):
+ raise TypeError("Response data must be a dict, got str")
+
+ if not isinstance(data, dict):
+ raise TypeError(f"Expected dict, got {type(data).__name__}")
+
+ response = requests.Response()
+ response.status_code = data["status_code"]
+ response.headers = CaseInsensitiveDict(data["headers"])
+ response._content = base64.standard_b64decode(data["content"])
+ response.url = data["url"]
+ response.reason = data.get("reason", "")
+ response.encoding = data.get("encoding")
+
+ cookies = RequestsCookieJar()
+ for name, value in data.get("cookies", {}).items():
+ cookies.set(name, str(value))
+ response.cookies = cookies
+
+ if data.get("history"):
+ response.history = [HttpResponseSerializer.deserialize(hist) for
hist in data["history"]]
+
+ return response
+
+
class HttpTrigger(BaseTrigger):
"""
HttpTrigger run on the trigger worker.
@@ -121,7 +165,7 @@ class HttpTrigger(BaseTrigger):
yield TriggerEvent(
{
"status": "success",
- "response":
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+ "response": HttpResponseSerializer.serialize(response),
}
)
except Exception as e:
@@ -301,7 +345,7 @@ class HttpEventTrigger(HttpTrigger, BaseEventTrigger):
yield TriggerEvent(
{
"status": "success",
- "response":
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+ "response": HttpResponseSerializer.serialize(response),
}
)
except Exception as e:
diff --git a/providers/http/tests/unit/http/operators/test_http.py
b/providers/http/tests/unit/http/operators/test_http.py
index 09fae893bbf..50b66e5f612 100644
--- a/providers/http/tests/unit/http/operators/test_http.py
+++ b/providers/http/tests/unit/http/operators/test_http.py
@@ -36,7 +36,7 @@ from airflow.models import Connection
from airflow.providers.common.compat.sdk import AirflowException, TaskDeferred
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.operators.http import HttpOperator
-from airflow.providers.http.triggers.http import HttpTrigger,
serialize_auth_type
+from airflow.providers.http.triggers.http import HttpResponseSerializer,
HttpTrigger, serialize_auth_type
@mock.patch.dict("os.environ",
AIRFLOW_CONN_HTTP_EXAMPLE="http://www.example.com")
@@ -124,11 +124,29 @@ class TestHttpOperator:
context={},
event={
"status": "success",
- "response":
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+ "response": HttpResponseSerializer.serialize(response),
},
)
assert result == "content"
+ def test_async_execute_legacy_pickle_format_raise_error(self):
+ """Test error raised with legacy pickle format."""
+ operator = HttpOperator(
+ task_id="test_HTTP_op",
+ deferrable=True,
+ )
+ response = Response()
+ response._content = b"content"
+
+ with pytest.raises(TypeError, match="Response data must be a dict, got
str"):
+ _ = operator.execute_complete(
+ context={},
+ event={
+ "status": "success",
+ "response":
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+ },
+ )
+
@pytest.mark.parametrize(
(
"data",
@@ -223,7 +241,7 @@ class TestHttpOperator:
context={},
event={
"status": "success",
- "response":
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+ "response": HttpResponseSerializer.serialize(response),
},
)
diff --git a/providers/http/tests/unit/http/triggers/test_http.py
b/providers/http/tests/unit/http/triggers/test_http.py
index 0e47c5559c1..e3338674af4 100644
--- a/providers/http/tests/unit/http/triggers/test_http.py
+++ b/providers/http/tests/unit/http/triggers/test_http.py
@@ -17,8 +17,6 @@
# under the License.
from __future__ import annotations
-import base64
-import pickle
from asyncio import Future
from http.cookies import SimpleCookie
from typing import Any
@@ -31,7 +29,12 @@ from requests.structures import CaseInsensitiveDict
from yarl import URL
from airflow.models import Connection
-from airflow.providers.http.triggers.http import HttpEventTrigger,
HttpSensorTrigger, HttpTrigger
+from airflow.providers.http.triggers.http import (
+ HttpEventTrigger,
+ HttpResponseSerializer,
+ HttpSensorTrigger,
+ HttpTrigger,
+)
from airflow.triggers.base import TriggerEvent
HTTP_PATH = "airflow.providers.http.triggers.http.{}"
@@ -144,10 +147,7 @@ class TestHttpTrigger:
generator = trigger.run()
actual = await generator.asend(None)
assert actual == TriggerEvent(
- {
- "status": "success",
- "response":
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
- }
+ {"status": "success", "response":
HttpResponseSerializer.serialize(response)}
)
@pytest.mark.asyncio
@@ -252,7 +252,7 @@ class TestHttpEventTrigger:
assert actual == TriggerEvent(
{
"status": "success",
- "response":
base64.standard_b64encode(pickle.dumps(response)).decode("ascii"),
+ "response": HttpResponseSerializer.serialize(response),
}
)
assert mock_hook.return_value.run.call_count == 2