gemini-code-assist[bot] commented on code in PR #39008:
URL: https://github.com/apache/beam/pull/39008#discussion_r3431669058
##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -77,6 +77,24 @@ def default_gcs_bucket_name(project, region):
region, md5(project.encode('utf8')).hexdigest())
+def _get_project_number(project_id, credentials=None):
+ """Resolves a project ID to its project number using Cloud Resource Manager
API."""
+ from google.cloud import resourcemanager_v3
+ client = resourcemanager_v3.ProjectsClient(credentials=credentials)
+ project_info = client.get_project(name=f"projects/{project_id}")
+ # project_info.name is of the form "projects/PROJECT_NUMBER"
+ return int(project_info.name.split('/')[-1])
+
+
+def _validate_bucket_project(bucket, project_id, credentials=None):
+ """Verifies that the GCS bucket is owned by the executing project."""
+ bucket_project_number = bucket.project_number
+ project_number = _get_project_number(project_id, credentials=credentials)
+ if bucket_project_number != project_number:
+ raise ValueError(
+ f'Bucket gs://{bucket.name} is not owned by project {project_id}.')
Review Comment:

To prevent pipeline startup failures, we should make this validation more
defensive:
1. Handle cases where `bucket.project_number` is `None` (e.g., when using
local GCS emulators or mock buckets in tests).
2. Catch potential exceptions from `_get_project_number` (such as when the
Cloud Resource Manager API is not enabled, or the executing service account
lacks permissions to call it) and log a warning instead of crashing the
pipeline.
```python
def _validate_bucket_project(bucket, project_id, credentials=None):
"""Verifies that the GCS bucket is owned by the executing project."""
bucket_project_number = bucket.project_number
if bucket_project_number is None:
_LOGGER.warning(
'Bucket gs://%s does not have a project number. Skipping ownership
validation.',
bucket.name)
return
try:
project_number = _get_project_number(project_id, credentials=credentials)
except Exception as e:
_LOGGER.warning(
'Failed to resolve project number for project %s: %s. '
'Skipping bucket ownership validation.',
project_id,
e)
return
if bucket_project_number != project_number:
raise ValueError(
f'Bucket gs://{bucket.name} is not owned by project {project_id}.')
```
##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -77,6 +77,24 @@ def default_gcs_bucket_name(project, region):
region, md5(project.encode('utf8')).hexdigest())
+def _get_project_number(project_id, credentials=None):
+ """Resolves a project ID to its project number using Cloud Resource Manager
API."""
+ from google.cloud import resourcemanager_v3
+ client = resourcemanager_v3.ProjectsClient(credentials=credentials)
+ project_info = client.get_project(name=f"projects/{project_id}")
+ # project_info.name is of the form "projects/PROJECT_NUMBER"
+ return int(project_info.name.split('/')[-1])
Review Comment:

If `project_id` is already a project number (either as an integer or a
numeric string), we can return it directly and avoid making a redundant, slow
network call to the Cloud Resource Manager API.
```suggestion
def _get_project_number(project_id, credentials=None):
"""Resolves a project ID to its project number using Cloud Resource
Manager API."""
if isinstance(project_id, int):
return project_id
if isinstance(project_id, str) and project_id.isdigit():
return int(project_id)
from google.cloud import resourcemanager_v3
client = resourcemanager_v3.ProjectsClient(credentials=credentials)
project_info = client.get_project(name=f"projects/{project_id}")
# project_info.name is of the form "projects/PROJECT_NUMBER"
return int(project_info.name.split('/')[-1])
```
##########
sdks/python/apache_beam/io/gcp/gcsio.py:
##########
@@ -90,16 +108,18 @@ def get_or_create_default_gcs_bucket(options):
return None
bucket_name = default_gcs_bucket_name(project, region)
- bucket = GcsIO(pipeline_options=options).get_bucket(bucket_name)
+ gcs = GcsIO(pipeline_options=options)
+ bucket = gcs.get_bucket(bucket_name)
if bucket:
+ # Validate project ownership of the pre-existing bucket
+ _validate_bucket_project(bucket, project,
credentials=gcs.client._credentials)
Review Comment:

Accessing the private attribute `_credentials` directly from `gcs.client` is
risky because private attributes are not part of the public API and can change
or be removed in future versions of `google-cloud-storage`. Using `getattr`
with a fallback is safer.
```python
# Validate project ownership of the pre-existing bucket
_validate_bucket_project(
bucket,
project,
credentials=getattr(gcs.client, '_credentials', None))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]