This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-8-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 08785f50f15a419e91a3eec03a60d9d0b9173343 Author: dmedora <21352998+dmed...@users.noreply.github.com> AuthorDate: Wed Nov 22 01:21:59 2023 -0800 added Topic params for schema_settings and message_retention_duration. (#35767) --- airflow/providers/google/cloud/hooks/pubsub.py | 10 ++++++++++ airflow/providers/google/cloud/operators/pubsub.py | 7 +++++++ tests/providers/google/cloud/hooks/test_pubsub.py | 9 ++++++++- tests/providers/google/cloud/operators/test_pubsub.py | 4 ++++ 4 files changed, 29 insertions(+), 1 deletion(-) diff --git a/airflow/providers/google/cloud/hooks/pubsub.py b/airflow/providers/google/cloud/hooks/pubsub.py index 1573f32841..0f6e5fcff8 100644 --- a/airflow/providers/google/cloud/hooks/pubsub.py +++ b/airflow/providers/google/cloud/hooks/pubsub.py @@ -57,6 +57,7 @@ if TYPE_CHECKING: PushConfig, ReceivedMessage, RetryPolicy, + SchemaSettings, ) @@ -182,6 +183,8 @@ class PubSubHook(GoogleBaseHook): labels: dict[str, str] | None = None, message_storage_policy: dict | MessageStoragePolicy = None, kms_key_name: str | None = None, + schema_settings: dict | SchemaSettings = None, + message_retention_duration: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -206,6 +209,11 @@ class PubSubHook(GoogleBaseHook): to be used to protect access to messages published on this topic. The expected format is ``projects/*/locations/*/keyRings/*/cryptoKeys/*``. + :param schema_settings: (Optional) Settings for validating messages published against an + existing schema. The expected format is ``projects/*/schemas/*``. + :param message_retention_duration: (Optional) Indicates the minimum duration to retain a + message after it is published to the topic. The expected format is a duration in + seconds with up to nine fractional digits, ending with 's'. Example: "3.5s". :param retry: (Optional) A retry object used to retry requests. If None is specified, requests will not be retried. :param timeout: (Optional) The amount of time, in seconds, to wait for the request @@ -228,6 +236,8 @@ class PubSubHook(GoogleBaseHook): "labels": labels, "message_storage_policy": message_storage_policy, "kms_key_name": kms_key_name, + "schema_settings": schema_settings, + "message_retention_duration": message_retention_duration, }, retry=retry, timeout=timeout, diff --git a/airflow/providers/google/cloud/operators/pubsub.py b/airflow/providers/google/cloud/operators/pubsub.py index b8c90be0fe..3751e9a371 100644 --- a/airflow/providers/google/cloud/operators/pubsub.py +++ b/airflow/providers/google/cloud/operators/pubsub.py @@ -35,6 +35,7 @@ from google.cloud.pubsub_v1.types import ( PushConfig, ReceivedMessage, RetryPolicy, + SchemaSettings, ) from airflow.providers.google.cloud.hooks.pubsub import PubSubHook @@ -130,6 +131,8 @@ class PubSubCreateTopicOperator(GoogleCloudBaseOperator): labels: dict[str, str] | None = None, message_storage_policy: dict | MessageStoragePolicy = None, kms_key_name: str | None = None, + schema_settings: dict | SchemaSettings = None, + message_retention_duration: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), @@ -144,6 +147,8 @@ class PubSubCreateTopicOperator(GoogleCloudBaseOperator): self.labels = labels self.message_storage_policy = message_storage_policy self.kms_key_name = kms_key_name + self.schema_settings = schema_settings + self.message_retention_duration = message_retention_duration self.retry = retry self.timeout = timeout self.metadata = metadata @@ -163,6 +168,8 @@ class PubSubCreateTopicOperator(GoogleCloudBaseOperator): labels=self.labels, message_storage_policy=self.message_storage_policy, kms_key_name=self.kms_key_name, + schema_settings=self.schema_settings, + message_retention_duration=self.message_retention_duration, retry=self.retry, timeout=self.timeout, metadata=self.metadata, diff --git a/tests/providers/google/cloud/hooks/test_pubsub.py b/tests/providers/google/cloud/hooks/test_pubsub.py index fe25df6562..04c53cdae2 100644 --- a/tests/providers/google/cloud/hooks/test_pubsub.py +++ b/tests/providers/google/cloud/hooks/test_pubsub.py @@ -103,7 +103,14 @@ class TestPubSubHook: create_method = mock_service.return_value.create_topic self.pubsub_hook.create_topic(project_id=TEST_PROJECT, topic=TEST_TOPIC) create_method.assert_called_once_with( - request=dict(name=EXPANDED_TOPIC, labels=LABELS, message_storage_policy=None, kms_key_name=None), + request=dict( + name=EXPANDED_TOPIC, + labels=LABELS, + message_storage_policy=None, + kms_key_name=None, + schema_settings=None, + message_retention_duration=None, + ), retry=DEFAULT, timeout=None, metadata=(), diff --git a/tests/providers/google/cloud/operators/test_pubsub.py b/tests/providers/google/cloud/operators/test_pubsub.py index f2a9fb0e9b..0598214f10 100644 --- a/tests/providers/google/cloud/operators/test_pubsub.py +++ b/tests/providers/google/cloud/operators/test_pubsub.py @@ -59,6 +59,8 @@ class TestPubSubTopicCreateOperator: labels=None, message_storage_policy=None, kms_key_name=None, + schema_settings=None, + message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=(), @@ -79,6 +81,8 @@ class TestPubSubTopicCreateOperator: labels=None, message_storage_policy=None, kms_key_name=None, + schema_settings=None, + message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=(),