This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-8-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 08785f50f15a419e91a3eec03a60d9d0b9173343
Author: dmedora <21352998+dmed...@users.noreply.github.com>
AuthorDate: Wed Nov 22 01:21:59 2023 -0800

    added Topic params for schema_settings and message_retention_duration. 
(#35767)
---
 airflow/providers/google/cloud/hooks/pubsub.py        | 10 ++++++++++
 airflow/providers/google/cloud/operators/pubsub.py    |  7 +++++++
 tests/providers/google/cloud/hooks/test_pubsub.py     |  9 ++++++++-
 tests/providers/google/cloud/operators/test_pubsub.py |  4 ++++
 4 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/hooks/pubsub.py 
b/airflow/providers/google/cloud/hooks/pubsub.py
index 1573f32841..0f6e5fcff8 100644
--- a/airflow/providers/google/cloud/hooks/pubsub.py
+++ b/airflow/providers/google/cloud/hooks/pubsub.py
@@ -57,6 +57,7 @@ if TYPE_CHECKING:
         PushConfig,
         ReceivedMessage,
         RetryPolicy,
+        SchemaSettings,
     )
 
 
@@ -182,6 +183,8 @@ class PubSubHook(GoogleBaseHook):
         labels: dict[str, str] | None = None,
         message_storage_policy: dict | MessageStoragePolicy = None,
         kms_key_name: str | None = None,
+        schema_settings: dict | SchemaSettings = None,
+        message_retention_duration: str | None = None,
         retry: Retry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
@@ -206,6 +209,11 @@ class PubSubHook(GoogleBaseHook):
             to be used to protect access to messages published on this topic.
             The expected format is
             ``projects/*/locations/*/keyRings/*/cryptoKeys/*``.
+        :param schema_settings: (Optional) Settings for validating messages 
published against an
+            existing schema. The expected format is ``projects/*/schemas/*``.
+        :param message_retention_duration: (Optional) Indicates the minimum 
duration to retain a
+            message after it is published to the topic. The expected format is 
a duration in
+            seconds with up to nine fractional digits, ending with 's'. 
Example: "3.5s".
         :param retry: (Optional) A retry object used to retry requests.
             If None is specified, requests will not be retried.
         :param timeout: (Optional) The amount of time, in seconds, to wait for 
the request
@@ -228,6 +236,8 @@ class PubSubHook(GoogleBaseHook):
                     "labels": labels,
                     "message_storage_policy": message_storage_policy,
                     "kms_key_name": kms_key_name,
+                    "schema_settings": schema_settings,
+                    "message_retention_duration": message_retention_duration,
                 },
                 retry=retry,
                 timeout=timeout,
diff --git a/airflow/providers/google/cloud/operators/pubsub.py 
b/airflow/providers/google/cloud/operators/pubsub.py
index b8c90be0fe..3751e9a371 100644
--- a/airflow/providers/google/cloud/operators/pubsub.py
+++ b/airflow/providers/google/cloud/operators/pubsub.py
@@ -35,6 +35,7 @@ from google.cloud.pubsub_v1.types import (
     PushConfig,
     ReceivedMessage,
     RetryPolicy,
+    SchemaSettings,
 )
 
 from airflow.providers.google.cloud.hooks.pubsub import PubSubHook
@@ -130,6 +131,8 @@ class PubSubCreateTopicOperator(GoogleCloudBaseOperator):
         labels: dict[str, str] | None = None,
         message_storage_policy: dict | MessageStoragePolicy = None,
         kms_key_name: str | None = None,
+        schema_settings: dict | SchemaSettings = None,
+        message_retention_duration: str | None = None,
         retry: Retry | _MethodDefault = DEFAULT,
         timeout: float | None = None,
         metadata: Sequence[tuple[str, str]] = (),
@@ -144,6 +147,8 @@ class PubSubCreateTopicOperator(GoogleCloudBaseOperator):
         self.labels = labels
         self.message_storage_policy = message_storage_policy
         self.kms_key_name = kms_key_name
+        self.schema_settings = schema_settings
+        self.message_retention_duration = message_retention_duration
         self.retry = retry
         self.timeout = timeout
         self.metadata = metadata
@@ -163,6 +168,8 @@ class PubSubCreateTopicOperator(GoogleCloudBaseOperator):
             labels=self.labels,
             message_storage_policy=self.message_storage_policy,
             kms_key_name=self.kms_key_name,
+            schema_settings=self.schema_settings,
+            message_retention_duration=self.message_retention_duration,
             retry=self.retry,
             timeout=self.timeout,
             metadata=self.metadata,
diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py 
b/tests/providers/google/cloud/hooks/test_pubsub.py
index fe25df6562..04c53cdae2 100644
--- a/tests/providers/google/cloud/hooks/test_pubsub.py
+++ b/tests/providers/google/cloud/hooks/test_pubsub.py
@@ -103,7 +103,14 @@ class TestPubSubHook:
         create_method = mock_service.return_value.create_topic
         self.pubsub_hook.create_topic(project_id=TEST_PROJECT, 
topic=TEST_TOPIC)
         create_method.assert_called_once_with(
-            request=dict(name=EXPANDED_TOPIC, labels=LABELS, 
message_storage_policy=None, kms_key_name=None),
+            request=dict(
+                name=EXPANDED_TOPIC,
+                labels=LABELS,
+                message_storage_policy=None,
+                kms_key_name=None,
+                schema_settings=None,
+                message_retention_duration=None,
+            ),
             retry=DEFAULT,
             timeout=None,
             metadata=(),
diff --git a/tests/providers/google/cloud/operators/test_pubsub.py 
b/tests/providers/google/cloud/operators/test_pubsub.py
index f2a9fb0e9b..0598214f10 100644
--- a/tests/providers/google/cloud/operators/test_pubsub.py
+++ b/tests/providers/google/cloud/operators/test_pubsub.py
@@ -59,6 +59,8 @@ class TestPubSubTopicCreateOperator:
             labels=None,
             message_storage_policy=None,
             kms_key_name=None,
+            schema_settings=None,
+            message_retention_duration=None,
             retry=DEFAULT,
             timeout=None,
             metadata=(),
@@ -79,6 +81,8 @@ class TestPubSubTopicCreateOperator:
             labels=None,
             message_storage_policy=None,
             kms_key_name=None,
+            schema_settings=None,
+            message_retention_duration=None,
             retry=DEFAULT,
             timeout=None,
             metadata=(),

Reply via email to