gemini-code-assist[bot] commented on code in PR #39008:
URL: https://github.com/apache/beam/pull/39008#discussion_r3431669058


##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -77,6 +77,24 @@ def default_gcs_bucket_name(project, region):
       region, md5(project.encode('utf8')).hexdigest())
 
 
+def _get_project_number(project_id, credentials=None):
+  """Resolves a project ID to its project number using Cloud Resource Manager 
API."""
+  from google.cloud import resourcemanager_v3
+  client = resourcemanager_v3.ProjectsClient(credentials=credentials)
+  project_info = client.get_project(name=f"projects/{project_id}")
+  # project_info.name is of the form "projects/PROJECT_NUMBER"
+  return int(project_info.name.split('/')[-1])
+
+
+def _validate_bucket_project(bucket, project_id, credentials=None):
+  """Verifies that the GCS bucket is owned by the executing project."""
+  bucket_project_number = bucket.project_number
+  project_number = _get_project_number(project_id, credentials=credentials)
+  if bucket_project_number != project_number:
+    raise ValueError(
+        f'Bucket gs://{bucket.name} is not owned by project {project_id}.')

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   To prevent pipeline startup failures, we should make this validation more 
defensive:
   1. Handle cases where `bucket.project_number` is `None` (e.g., when using 
local GCS emulators or mock buckets in tests).
   2. Catch potential exceptions from `_get_project_number` (such as when the 
Cloud Resource Manager API is not enabled, or the executing service account 
lacks permissions to call it) and log a warning instead of crashing the 
pipeline.
   
   ```python
   def _validate_bucket_project(bucket, project_id, credentials=None):
     """Verifies that the GCS bucket is owned by the executing project."""
     bucket_project_number = bucket.project_number
     if bucket_project_number is None:
       _LOGGER.warning(
           'Bucket gs://%s does not have a project number. Skipping ownership 
validation.',
           bucket.name)
       return
   
     try:
       project_number = _get_project_number(project_id, credentials=credentials)
     except Exception as e:
       _LOGGER.warning(
           'Failed to resolve project number for project %s: %s. '
           'Skipping bucket ownership validation.',
           project_id,
           e)
       return
   
     if bucket_project_number != project_number:
       raise ValueError(
           f'Bucket gs://{bucket.name} is not owned by project {project_id}.')
   ```



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -77,6 +77,24 @@ def default_gcs_bucket_name(project, region):
       region, md5(project.encode('utf8')).hexdigest())
 
 
+def _get_project_number(project_id, credentials=None):
+  """Resolves a project ID to its project number using Cloud Resource Manager 
API."""
+  from google.cloud import resourcemanager_v3
+  client = resourcemanager_v3.ProjectsClient(credentials=credentials)
+  project_info = client.get_project(name=f"projects/{project_id}")
+  # project_info.name is of the form "projects/PROJECT_NUMBER"
+  return int(project_info.name.split('/')[-1])

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `project_id` is already a project number (either as an integer or a 
numeric string), we can return it directly and avoid making a redundant, slow 
network call to the Cloud Resource Manager API.
   
   ```suggestion
   def _get_project_number(project_id, credentials=None):
     """Resolves a project ID to its project number using Cloud Resource 
Manager API."""
     if isinstance(project_id, int):
       return project_id
     if isinstance(project_id, str) and project_id.isdigit():
       return int(project_id)
     from google.cloud import resourcemanager_v3
     client = resourcemanager_v3.ProjectsClient(credentials=credentials)
     project_info = client.get_project(name=f"projects/{project_id}")
     # project_info.name is of the form "projects/PROJECT_NUMBER"
     return int(project_info.name.split('/')[-1])
   ```



##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -90,16 +108,18 @@ def get_or_create_default_gcs_bucket(options):
     return None
 
   bucket_name = default_gcs_bucket_name(project, region)
-  bucket = GcsIO(pipeline_options=options).get_bucket(bucket_name)
+  gcs = GcsIO(pipeline_options=options)
+  bucket = gcs.get_bucket(bucket_name)
   if bucket:
+    # Validate project ownership of the pre-existing bucket
+    _validate_bucket_project(bucket, project, 
credentials=gcs.client._credentials)

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Accessing the private attribute `_credentials` directly from `gcs.client` is 
risky because private attributes are not part of the public API and can change 
or be removed in future versions of `google-cloud-storage`. Using `getattr` 
with a fallback is safer.
   
   ```python
       # Validate project ownership of the pre-existing bucket
       _validate_bucket_project(
           bucket,
           project,
           credentials=getattr(gcs.client, '_credentials', None))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to