This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a1e1dc60fb handle tzinfo in S3Hook.is_keys_unchanged_async (#36363)
a1e1dc60fb is described below

commit a1e1dc60fb70d102451a6a819ccc78c079b65ddd
Author: Wei Lee <weilee...@gmail.com>
AuthorDate: Mon Dec 25 19:29:36 2023 +0530

    handle tzinfo in S3Hook.is_keys_unchanged_async (#36363)
    
    * fix(providers/amazon): handle tzinfo in S3Hook.is_keys_unchanged_async
    
    * test(providers/amazon): add timezone info to is_keys_unchanged_async test 
case
    
    * test(providers/amazon): add test case to test last_activity_time without 
tzinfo
---
 airflow/providers/amazon/aws/hooks/s3.py    |  4 ++-
 tests/providers/amazon/aws/hooks/test_s3.py | 53 ++++++++++++++++++++++++++++-
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/airflow/providers/amazon/aws/hooks/s3.py 
b/airflow/providers/amazon/aws/hooks/s3.py
index 6bd0b0a750..d75e3e337a 100644
--- a/airflow/providers/amazon/aws/hooks/s3.py
+++ b/airflow/providers/amazon/aws/hooks/s3.py
@@ -713,7 +713,9 @@ class S3Hook(AwsBaseHook):
             }
 
         if last_activity_time:
-            inactivity_seconds = int((datetime.now() - 
last_activity_time).total_seconds())
+            inactivity_seconds = int(
+                (datetime.now(last_activity_time.tzinfo) - 
last_activity_time).total_seconds()
+            )
         else:
             # Handles the first poke where last inactivity time is None.
             last_activity_time = datetime.now()
diff --git a/tests/providers/amazon/aws/hooks/test_s3.py 
b/tests/providers/amazon/aws/hooks/test_s3.py
index bc4448cebc..b1b52b3557 100644
--- a/tests/providers/amazon/aws/hooks/test_s3.py
+++ b/tests/providers/amazon/aws/hooks/test_s3.py
@@ -22,6 +22,7 @@ import inspect
 import os
 import re
 import unittest
+from datetime import datetime as std_datetime, timezone
 from unittest import mock, mock as async_mock
 from unittest.mock import MagicMock, Mock, patch
 from urllib.parse import parse_qs
@@ -762,7 +763,7 @@ class TestAwsS3Hook:
     @pytest.mark.asyncio
     
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
     
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
-    async def test_s3_key_hook_is_keys_unchanged_pending_async(self, 
mock_list_keys, mock_client):
+    async def test_s3_key_hook_is_keys_unchanged_async_handle_tzinfo(self, 
mock_list_keys, mock_client):
         """
         Test is_key_unchanged gives AirflowException.
         """
@@ -812,6 +813,56 @@ class TestAwsS3Hook:
             "message": "FAILURE: Inactivity Period passed, not enough objects 
found in test_bucket/test",
         }
 
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+    
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+    async def test_s3_key_hook_is_keys_unchanged_pending_async_without_tzinfo(
+        self, mock_list_keys, mock_client
+    ):
+        """
+        Test is_key_unchanged gives AirflowException.
+        """
+        mock_list_keys.return_value = []
+
+        s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+        response = await s3_hook_async.is_keys_unchanged_async(
+            client=mock_client.return_value,
+            bucket_name="test_bucket",
+            prefix="test",
+            inactivity_period=1,
+            min_objects=0,
+            previous_objects=set(),
+            inactivity_seconds=0,
+            allow_delete=False,
+            last_activity_time=std_datetime.now(),
+        )
+        assert response.get("status") == "pending"
+
+    @pytest.mark.asyncio
+    
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook.async_conn")
+    
@async_mock.patch("airflow.providers.amazon.aws.triggers.s3.S3Hook._list_keys_async")
+    async def 
test_s3_key_hook_is_keys_unchanged_pending_async_with_tzinfo(self, 
mock_list_keys, mock_client):
+        """
+        Test is_key_unchanged gives AirflowException.
+        """
+        mock_list_keys.return_value = []
+
+        s3_hook_async = S3Hook(client_type="S3", resource_type="S3")
+
+        response = await s3_hook_async.is_keys_unchanged_async(
+            client=mock_client.return_value,
+            bucket_name="test_bucket",
+            prefix="test",
+            inactivity_period=1,
+            min_objects=0,
+            previous_objects=set(),
+            inactivity_seconds=0,
+            allow_delete=False,
+            last_activity_time=std_datetime.now(timezone.utc),
+        )
+        assert response.get("status") == "pending"
+
     def test_load_bytes(self, s3_bucket):
         hook = S3Hook()
         hook.load_bytes(b"Content", "my_key", s3_bucket)

Reply via email to