shahar1 commented on code in PR #56324:
URL: https://github.com/apache/airflow/pull/56324#discussion_r2774150130


##########
providers/google/src/airflow/providers/google/common/hooks/base_google.py:
##########
@@ -317,10 +332,7 @@ def get_credentials_and_project_id(self) -> 
tuple[Credentials, str | None]:
 
         idp_extra_params_dict: dict[str, str] | None = None
         if idp_extra_params:
-            try:
-                idp_extra_params_dict = json.loads(idp_extra_params)
-            except json.decoder.JSONDecodeError:
-                raise AirflowException("Invalid JSON.")

Review Comment:
   Same as above, please revert



##########
providers/google/src/airflow/providers/google/common/hooks/base_google.py:
##########
@@ -286,16 +304,13 @@ def get_credentials_and_project_id(self) -> 
tuple[Credentials, str | None]:
             return self._cached_credentials, self._cached_project_id
 
         key_path: str | None = self._get_field("key_path", None)
-        try:
-            keyfile_dict: str | dict[str, str] | None = 
self._get_field("keyfile_dict", None)
-            keyfile_dict_json: dict[str, str] | None = None
-            if keyfile_dict:
-                if isinstance(keyfile_dict, dict):
-                    keyfile_dict_json = keyfile_dict
-                else:
-                    keyfile_dict_json = json.loads(keyfile_dict)
-        except json.decoder.JSONDecodeError:
-            raise AirflowException("Invalid key JSON.")

Review Comment:
   My previous comment regarding `AirflowException` is valid for new usages. 
Please revert this section as it's irrelevant for this PR.



##########
providers/google/src/airflow/providers/google/common/utils/get_secret.py:
##########
@@ -28,4 +28,5 @@ def get_secret(secret_id: str) -> str:
     hook = GoogleCloudSecretManagerHook()
     if hook.secret_exists(secret_id=secret_id):
         return hook.access_secret(secret_id=secret_id).payload.data.decode()
-    raise NotFound("The secret '%s' not found", secret_id)
+    raise NotFound(f"The secret '{secret_id}' not found")

Review Comment:
   Please revert this change - unrelated to this PR, and also using f-string 
inside exceptions isn't a recommended practice, see: 
https://docs.astral.sh/ruff/rules/f-string-in-exception/



##########
providers/google/src/airflow/providers/google/common/utils/project_id.py:
##########
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import re
+
+
+def is_valid_gcp_project_id(project_id: str) -> bool:
+    return bool(re.match(r"^[a-z][a-z0-9]{4,28}[a-z0-9]$", project_id))

Review Comment:
   This logic should have a unit test



##########
providers/google/src/airflow/providers/google/common/hooks/base_google.py:
##########
@@ -429,14 +473,8 @@ def num_retries(self) -> int:
             return 5
         if isinstance(field_value, str) and field_value.strip() == "":
             return 5
-        try:
-            return int(field_value)
-        except ValueError:
-            raise AirflowException(
-                f"The num_retries field should be a integer. "
-                f'Current value: "{field_value}" (type: {type(field_value)}). '
-                f"Please check the connection configuration."
-            )
+        
+        return int(field_value)
 

Review Comment:
   Why was this changed? To clarify, my comment regarding `AirflowException` 
was regarding new usages - so you may retain this one as-is. While I agree that 
it doesn't seem to add much value - it isn't directly related to this PR and 
could possibly cause regressions.
   Please revert this change.



##########
providers/google/src/airflow/providers/google/common/hooks/base_google.py:
##########
@@ -347,8 +372,33 @@ def get_credentials_and_project_id(self) -> 
tuple[Credentials, str | None]:
 
         return credentials, project_id
 
+    def _validate_quota_project(self, quota_project: str) -> None:
+        """
+        Validate the quota Project ID format.
+
+        This validates the Project ID string format only and does not verify
+        whether the project exists or is accessible.
+
+        :param quota_project: The quota Project ID to validate
+        :raises AirflowException: If the quota Project ID is invalid
+        """
+        if not isinstance(quota_project, str):
+            raise AirflowException(f"quota_project_id must be a string, got 
{type(quota_project)}")
+        if not quota_project.strip():
+            raise AirflowException("quota_project_id cannot be empty")
+        # Check for valid GCP Project ID format
+        if not is_valid_gcp_project_id(quota_project):
+            raise AirflowException(
+                f"Invalid quota_project_id '{quota_project}'. "
+                "Project IDs must start with a lowercase letter and can 
contain "
+                "only lowercase letters and digits."
+            )

Review Comment:
   These `AirflowException` are new, and therefore should be replaced with 
something else (I'd go for `ValueError`)



##########
providers/google/src/airflow/providers/google/common/hooks/base_google.py:
##########
@@ -338,6 +350,19 @@ def get_credentials_and_project_id(self) -> 
tuple[Credentials, str | None]:
             idp_extra_params_dict=idp_extra_params_dict,
         )
 
+        # Apply quota project before caching credentials
+        quota_project = self.quota_project_id or 
self._get_field("quota_project_id")
+        if quota_project:
+            self._validate_quota_project(quota_project)
+            if not hasattr(credentials, "with_quota_project"):
+                raise AirflowException(
+                    f"Credentials of type {type(credentials).__name__} do not 
support "
+                    "quota project configuration. Please use a different 
authentication method "
+                    "or remove the quota_project_id setting."
+                )

Review Comment:
   This `AirflowException` is new, and therefore should be replace with 
something else (I'd go for `ValueError`)



##########
providers/google/src/airflow/providers/google/common/hooks/base_google.py:
##########
@@ -570,7 +608,7 @@ def provide_gcp_credential_file_as_context(self) -> 
Generator[str | None, None,
             )
         elif key_path:
             if key_path.endswith(".p12"):
-                raise AirflowException("Legacy P12 key file are not supported, 
use a JSON key file.")
+                raise AirflowException("Legacy P12 key files are not 
supported, use a JSON key file.")

Review Comment:
   These changed are unrelated to this PR, and possibly broke some of the tests 
- please revert them.  



##########
providers/google/docs/connections/gcp.rst:
##########
@@ -121,6 +122,21 @@ Scopes (comma separated)
     <https://developers.google.com/identity/protocols/googlescopes>`_ to
     authenticate with.
 
+
+Quota Project ID (optional)
+---------------------------
+
+The Google Cloud project ID to use for API quota and billing purposes. This is 
useful when using a shared service account but want to attribute quota/billing 
to a different project. If not specified, the default project from the 
connection is used. Must be a valid GCP project ID (lowercase letters, digits, 
hyphens, 6–30 characters, starting with a letter).
+
+.. note::
+

Review Comment:
   @Ankurdeewan Please address comment as well



##########
providers/google/src/airflow/providers/google/common/hooks/base_google.py:
##########
@@ -372,14 +422,8 @@ def _get_credentials_email(self) -> str:
         credentials = self.get_credentials()
 
         if isinstance(credentials, compute_engine.Credentials):
-            try:
-                credentials.refresh(_http_client.Request())
-            except RefreshError as msg:
-                """
-                If the Compute Engine metadata service can't be reached in 
this case the instance has not
-                credentials.
-                """
-                self.log.debug(msg)

Review Comment:
   Why was this changed? Please revert - I think that it could break existing 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to