This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c01abd1c2e Upgrade watchtower to 3.0.1 (#25019) (#34747)
c01abd1c2e is described below

commit c01abd1c2eed8f60fec5b9d6cc0232b54efa52de
Author: cBiscuitSurprise <36653704+cbiscuitsurpr...@users.noreply.github.com>
AuthorDate: Fri Oct 6 09:35:09 2023 -0500

    Upgrade watchtower to 3.0.1 (#25019) (#34747)
---
 .../amazon/aws/log/cloudwatch_task_handler.py      | 35 ++++++++++++++-
 airflow/providers/amazon/provider.yaml             | 24 ++++++++---
 docs/apache-airflow-providers-amazon/index.rst     |  2 +-
 generated/provider_dependencies.json               |  2 +-
 .../amazon/aws/log/test_cloudwatch_task_handler.py | 50 +++++++++++++++++++++-
 5 files changed, 103 insertions(+), 10 deletions(-)

diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py 
b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
index 4d3ccec00c..126ed3ea84 100644
--- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
+++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
@@ -17,9 +17,9 @@
 # under the License.
 from __future__ import annotations
 
-from datetime import datetime, timedelta
+from datetime import date, datetime, timedelta
 from functools import cached_property
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 import watchtower
 
@@ -33,6 +33,35 @@ if TYPE_CHECKING:
     from airflow.models import TaskInstance
 
 
+def json_serialize_legacy(value: Any) -> str | None:
+    """
+    JSON serializer replicating legacy watchtower behavior.
+
+    The legacy `watchtower@2.0.1` json serializer function that serialized
+    datetime objects as ISO format and all other non-JSON-serializable to 
`null`.
+
+    :param value: the object to serialize
+    :return: string representation of `value` if it is an instance of datetime 
or `None` otherwise
+    """
+    if isinstance(value, (date, datetime)):
+        return value.isoformat()
+    else:
+        return None
+
+
+def json_serialize(value: Any) -> str | None:
+    """
+    JSON serializer replicating current watchtower behavior.
+
+    This provides customers with an accessible import,
+    `airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize`
+
+    :param value: the object to serialize
+    :return: string representation of `value`
+    """
+    return watchtower._json_serialize_default(value)
+
+
 class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
     """
     CloudwatchTaskHandler is a python log handler that handles and reads task 
instance logs.
@@ -69,11 +98,13 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin):
 
     def set_context(self, ti):
         super().set_context(ti)
+        self.json_serialize = conf.getimport("aws", 
"cloudwatch_task_handler_json_serializer")
         self.handler = watchtower.CloudWatchLogHandler(
             log_group_name=self.log_group,
             log_stream_name=self._render_filename(ti, ti.try_number),
             use_queues=not getattr(ti, "is_trigger_log_context", False),
             boto3_client=self.hook.get_conn(),
+            json_serialize_default=self.json_serialize,
         )
 
     def close(self):
diff --git a/airflow/providers/amazon/provider.yaml 
b/airflow/providers/amazon/provider.yaml
index 8ce738480c..3f5ad39025 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -81,11 +81,7 @@ dependencies:
   # NOTE!!! BOTOCORE version is always shifted by 3 MINOR VERSIONS from boto3
   # See https://github.com/boto/boto3/issues/2702
   - botocore>=1.31.0
-  # watchtower 3 has been released end Jan and introduced breaking change 
across the board that might
-  # change logging behaviour:
-  # 
https://github.com/kislyuk/watchtower/blob/develop/Changes.rst#changes-for-v300-2022-01-26
-  # TODO: update to watchtower >3
-  - watchtower~=2.0.1
+  - watchtower~=3.0.1
   - jsonpath_ng>=1.5.3
   - redshift_connector>=2.0.888
   - sqlalchemy_redshift>=0.8.6
@@ -726,3 +722,21 @@ config:
         example: my_company.aws.MyCustomSessionFactory
         type: string
         version_added: 3.1.1
+      cloudwatch_task_handler_json_serializer:
+        description: |
+          By default, when logging non-string messages, all non-json objects 
are logged as `null`.
+          Except `datetime` objects which are ISO formatted. Users can 
optionally use a `repr` serializer or
+          provide their own JSON serializer for any non-JSON-serializable 
objects in the logged message.
+
+          * 
`airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize` uses 
`repr` (be aware
+            there is the potential of logging sensitive data depending on the 
`repr` method of logged objects)
+          * 
`airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy`
 uses `null`.
+
+          If a custom serializer is provided, it must adhere to 
`Callable[[Any], str | None]`, where `None`
+          serializes to `null` (e.g. `def my_serializer(o: Any) -> str | 
None`). Since this is on the logging
+          path and it's possible there's an exception being handled, special 
care should be taken to fail
+          gracefully without raising a new exception inside of your serializer.
+        type: string
+        version_added: 8.7.2
+        example: 
airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize
+        default: 
airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize_legacy
diff --git a/docs/apache-airflow-providers-amazon/index.rst 
b/docs/apache-airflow-providers-amazon/index.rst
index d846b73c56..c1f52f9651 100644
--- a/docs/apache-airflow-providers-amazon/index.rst
+++ b/docs/apache-airflow-providers-amazon/index.rst
@@ -112,7 +112,7 @@ PIP package                              Version required
 ``boto3``                                ``>=1.28.0``
 ``botocore``                             ``>=1.31.0``
 ``asgiref``
-``watchtower``                           ``~=2.0.1``
+``watchtower``                           ``~=3.0.1``
 ``jsonpath_ng``                          ``>=1.5.3``
 ``redshift_connector``                   ``>=2.0.888``
 ``sqlalchemy_redshift``                  ``>=0.8.6``
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 212f308989..5fc8ed0df0 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -30,7 +30,7 @@
       "jsonpath_ng>=1.5.3",
       "redshift_connector>=2.0.888",
       "sqlalchemy_redshift>=0.8.6",
-      "watchtower~=2.0.1"
+      "watchtower~=3.0.1"
     ],
     "cross-providers-deps": [
       "apache.hive",
diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py 
b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
index 00d5cf2f46..b3c1cd6e39 100644
--- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
+++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py
@@ -17,10 +17,12 @@
 # under the License.
 from __future__ import annotations
 
+import contextlib
+import logging
 import time
 from datetime import datetime as dt, timedelta
 from unittest import mock
-from unittest.mock import call
+from unittest.mock import ANY, Mock, call
 
 import boto3
 import moto
@@ -171,6 +173,52 @@ class TestCloudwatchTaskHandler:
             end_time=expected_end_time,
         )
 
+    @pytest.mark.parametrize(
+        "conf_json_serialize, expected_serialized_output",
+        [
+            (None, '{"datetime": "2023-01-01T00:00:00+00:00", "customObject": 
null}'),
+            (
+                
"airflow.providers.amazon.aws.log.cloudwatch_task_handler.json_serialize",
+                '{"datetime": "2023-01-01T00:00:00+00:00", "customObject": 
"SomeCustomSerialization(...)"}',
+            ),
+        ],
+    )
+    @mock.patch.object(AwsLogsHook, "get_log_events")
+    def test_write_json_logs(self, mock_get_log_events, conf_json_serialize, 
expected_serialized_output):
+        class ToSerialize:
+            def __init__(self):
+                pass
+
+            def __repr__(self):
+                return "SomeCustomSerialization(...)"
+
+        with contextlib.ExitStack() as stack:
+            if conf_json_serialize:
+                stack.enter_context(
+                    conf_vars({("aws", 
"cloudwatch_task_handler_json_serializer"): conf_json_serialize})
+                )
+
+            handler = self.cloudwatch_task_handler
+            handler.set_context(self.ti)
+            message = logging.LogRecord(
+                name="test_log_record",
+                level=logging.DEBUG,
+                pathname="fake.path",
+                lineno=42,
+                args=None,
+                exc_info=None,
+                msg={
+                    "datetime": datetime(2023, 1, 1),
+                    "customObject": ToSerialize(),
+                },
+            )
+            stack.enter_context(mock.patch("watchtower.threading.Thread"))
+            mock_queue = Mock()
+            stack.enter_context(mock.patch("watchtower.queue.Queue", 
return_value=mock_queue))
+            handler.handle(message)
+
+            mock_queue.put.assert_called_once_with({"message": 
expected_serialized_output, "timestamp": ANY})
+
     def test_close_prevents_duplicate_calls(self):
         with mock.patch("watchtower.CloudWatchLogHandler.close") as 
mock_log_handler_close:
             with 
mock.patch("airflow.utils.log.file_task_handler.FileTaskHandler.set_context"):

Reply via email to