gemini-code-assist[bot] commented on code in PR #38579:
URL: https://github.com/apache/beam/pull/38579#discussion_r3279784501
##########
.test-infra/metrics/sync/github/github_runs_prefetcher/code/main.py:
##########
@@ -148,6 +203,44 @@ def enhance_workflow(workflow):
print(f"No yaml file found for workflow: {workflow.name}")
+async def enrich_cancelled_schedule_run_jobs(workflows, semaphore, headers):
+ tasks = []
+ for workflow in workflows:
+ for run in workflow.runs:
+ if run.event != "schedule" or run.status != "cancelled":
+ continue
+ if run_duration(run.started_at, run.updated_at) >
CANCELLED_FAILURE_MIN_DURATION:
+ continue
+ jobs_url = (
+
f"https://api.github.com/repos/{GIT_ORG}/beam/actions/runs/{run.id}/jobs"
+ )
+ tasks.append(fetch_jobs_count(jobs_url, run, semaphore, headers))
+
+ if tasks:
+ await asyncio.gather(*tasks)
+
+
[email protected]_exception(backoff.constant, aiohttp.ClientResponseError,
max_tries=5)
+async def fetch_jobs_count(url, run, semaphore, headers):
+ async with semaphore:
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url, headers=headers) as response:
+ if response.status == 200:
+ result = await response.json()
+ run.jobs_count = result.get("total_count", 0)
+ return
+ if response.status == 403:
+ print(f"Retry for: {url}")
+ headers["Authorization"] = get_token()
+ raise aiohttp.ClientResponseError(
+ response.request_info,
+ response.history,
+ status=response.status,
+ message=response.reason,
+ headers=response.headers,
+ )
Review Comment:

The `backoff` decorator should include a `giveup` condition to avoid
retrying on non-recoverable errors like 404 (Not Found). Also, the function
signature should be updated to accept a shared `session` to avoid creating one
per request.
```python
@backoff.on_exception(
backoff.constant,
aiohttp.ClientResponseError,
max_tries=5,
giveup=lambda e: e.status not in [403, 429, 500, 502, 503, 504]
)
async def fetch_jobs_count(session, url, run, semaphore, headers):
async with semaphore:
async with session.get(url, headers=headers) as response:
if response.status == 200:
result = await response.json()
run.jobs_count = result.get("total_count", 0)
return
if response.status == 403:
print(f"Retry for: {url}")
headers["Authorization"] = get_token()
raise aiohttp.ClientResponseError(
response.request_info,
response.history,
status=response.status,
message=response.reason,
headers=response.headers,
)
```
##########
.test-infra/metrics/sync/github/github_runs_prefetcher/code/main.py:
##########
@@ -148,6 +203,44 @@ def enhance_workflow(workflow):
print(f"No yaml file found for workflow: {workflow.name}")
+async def enrich_cancelled_schedule_run_jobs(workflows, semaphore, headers):
+ tasks = []
+ for workflow in workflows:
+ for run in workflow.runs:
+ if run.event != "schedule" or run.status != "cancelled":
+ continue
+ if run_duration(run.started_at, run.updated_at) >
CANCELLED_FAILURE_MIN_DURATION:
+ continue
+ jobs_url = (
+
f"https://api.github.com/repos/{GIT_ORG}/beam/actions/runs/{run.id}/jobs"
+ )
+ tasks.append(fetch_jobs_count(jobs_url, run, semaphore, headers))
+
+ if tasks:
+ await asyncio.gather(*tasks)
Review Comment:

Creating a new `aiohttp.ClientSession` for every request is inefficient and
can lead to resource exhaustion. It is recommended to create a single session
and reuse it for all requests in the batch. Additionally, consider passing this
session to `fetch_jobs_count`.
```suggestion
async def enrich_cancelled_schedule_run_jobs(workflows, semaphore, headers):
tasks = []
async with aiohttp.ClientSession() as session:
for workflow in workflows:
for run in workflow.runs:
if run.event != "schedule" or run.status != "cancelled":
continue
if run_duration(run.started_at, run.updated_at) >
CANCELLED_FAILURE_MIN_DURATION:
continue
jobs_url = (
f"https://api.github.com/repos/{GIT_ORG}/beam/actions/runs/{run.id}/jobs"
)
tasks.append(fetch_jobs_count(session, jobs_url, run,
semaphore, headers))
if tasks:
await asyncio.gather(*tasks)
```
##########
.test-infra/metrics/sync/github/github_runs_prefetcher/code/main.py:
##########
@@ -426,11 +528,15 @@ def append_workflow_runs(workflow, runs):
page += 1
print("Successfully fetched workflow runs details")
- for workflow in list(workflows.values()):
+ workflows_list = list(workflows.values())
+ await enrich_cancelled_schedule_run_jobs(workflows_list, semaphore,
headers)
+ for workflow in workflows_list:
runs = sorted(workflow.runs, key=lambda r: r.started_at, reverse=True)
- workflow.runs = runs[: int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
+ workflow.runs = prepare_workflow_runs_for_flakiness(
+ runs[: int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
+ )
Review Comment:

It is more efficient to sort and slice the workflow runs before calling
`enrich_cancelled_schedule_run_jobs`. This avoids making unnecessary API calls
to fetch job counts for runs that will eventually be discarded by the slice.
```python
workflows_list = list(workflows.values())
for workflow in workflows_list:
workflow.runs = sorted(workflow.runs, key=lambda r: r.started_at,
reverse=True)[: int(GH_NUMBER_OF_WORKFLOW_RUNS_TO_FETCH)]
await enrich_cancelled_schedule_run_jobs(workflows_list, semaphore,
headers)
for workflow in workflows_list:
workflow.runs = prepare_workflow_runs_for_flakiness(workflow.runs)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]