This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/sbp by this push:
new 525fe16 Fix function ordering
525fe16 is described below
commit 525fe1610c037c7e2a05a7ee3a18a3f44be9fe1c
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Feb 3 16:11:45 2026 +0000
Fix function ordering
---
atr/get/sbom.py | 578 ++++++++++++++++++++++-----------------------
atr/shared/distribution.py | 333 +++++++++++++-------------
atr/shared/ignores.py | 52 ++--
atr/util.py | 2 +-
tests/unit/test_cache.py | 22 +-
tests/unit/test_ldap.py | 8 +-
tests/unit/test_mail.py | 282 +++++++++++-----------
tests/unit/test_user.py | 44 ++--
tests/unit/test_vote.py | 180 +++++++-------
9 files changed, 750 insertions(+), 751 deletions(-)
diff --git a/atr/get/sbom.py b/atr/get/sbom.py
index fc56c59..9a16a47 100644
--- a/atr/get/sbom.py
+++ b/atr/get/sbom.py
@@ -121,168 +121,6 @@ async def report(session: web.Committer, project: str,
version: str, file_path:
return await template.blank("SBOM report", content=block.collect())
-async def _fetch_tasks(
- file_path: str, project: str, release: sql.Release, version: str
-) -> tuple[sql.Task | None, Sequence[sql.Task], Sequence[sql.Task]]:
- # TODO: Abstract this code and the sbomtool.MissingAdapter validators
- async with db.session() as data:
- via = sql.validate_instrumented_attribute
- tasks = (
- await data.task(
- project_name=project,
- version_name=version,
- revision_number=release.latest_revision_number,
- task_type=sql.TaskType.SBOM_TOOL_SCORE,
- primary_rel_path=file_path,
- )
- .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
- .all()
- )
- augment_tasks = (
- await data.task(
- project_name=project,
- version_name=version,
- task_type=sql.TaskType.SBOM_AUGMENT,
- primary_rel_path=file_path,
- )
- .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
- .all()
- )
- # Run or running scans for the current revision
- osv_tasks = (
- await data.task(
- project_name=project,
- version_name=version,
- task_type=sql.TaskType.SBOM_OSV_SCAN,
- primary_rel_path=file_path,
- revision_number=release.latest_revision_number,
- )
- .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
- .all()
- )
- return (tasks[0] if (len(tasks) > 0) else None), augment_tasks,
osv_tasks
-
-
-def _outdated_tool_section(block: htm.Block, task_result:
results.SBOMToolScore):
- block.h2["Outdated tools"]
- if task_result.outdated:
- outdated = []
- if isinstance(task_result.outdated, str):
- # Older version, only checked one tool
- outdated =
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(task_result.outdated))]
- elif isinstance(task_result.outdated, list):
- # Newer version, checked multiple tools
- outdated =
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(o)) for o in
task_result.outdated]
- if len(outdated) == 0:
- block.p["No outdated tools found."]
- for result in outdated:
- if result.kind == "tool":
- if "Apache Trusted Releases" in result.name:
- block.p[
- f"""The last version of ATR used on this SBOM was
- {result.used_version} but ATR is currently version
- {result.available_version}."""
- ]
- else:
- block.p[
- f"""The {result.name} is outdated. The used version is
- {result.used_version} and the available version is
- {result.available_version}."""
- ]
- else:
- if (result.kind == "missing_metadata") or (result.kind ==
"missing_timestamp"):
- # These both return without checking any further tools as
they prevent checking
- block.p[
- f"""There was a problem with the SBOM detected when
trying to
- determine if any tools were outdated:
- {result.kind.upper()}."""
- ]
- else:
- block.p[
- f"""There was a problem with the SBOM detected when
trying to
- determine if the {result.name} is outdated:
- {result.kind.upper()}."""
- ]
- else:
- block.p["No outdated tools found."]
-
-
-def _conformance_section(block: htm.Block, task_result: results.SBOMToolScore)
-> None:
- block.h2["Conformance report"]
- warnings =
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in
task_result.warnings]
- errors =
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in
task_result.errors]
- if warnings:
- block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"),
"Warnings"]
- _missing_table(block, warnings)
-
- if errors:
- block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
- _missing_table(block, errors)
-
- if not (warnings or errors):
- block.p["No NTIA 2021 minimum data field conformance warnings or
errors found."]
-
-
-def _license_section(block: htm.Block, task_result: results.SBOMToolScore) ->
None:
- block.h2["Licenses"]
- warnings = []
- errors = []
- prev_licenses = None
- if task_result.prev_licenses is not None:
- prev_licenses = _load_license_issues(task_result.prev_licenses)
- if task_result.license_warnings is not None:
- warnings = _load_license_issues(task_result.license_warnings)
- if task_result.license_errors is not None:
- errors = _load_license_issues(task_result.license_errors)
- # TODO: Rework the rendering of these since category in the table is
redundant.
- if warnings:
- block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"),
"Warnings"]
- _license_table(block, warnings, prev_licenses)
-
- if errors:
- block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
- _license_table(block, errors, prev_licenses)
-
- if not (warnings or errors):
- block.p["No license warnings or errors found."]
-
-
-def _load_license_issues(issues: list[str]) ->
list[sbom.models.licenses.Issue]:
- return [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in
issues]
-
-
-def _report_header(
- block: htm.Block, is_release_candidate: bool, release: sql.Release,
task_result: results.SBOMToolScore
-) -> None:
- block.p[
- """This is a report by the ATR SBOM tool, for debugging and
- informational purposes. Please use it only as an approximate
- guideline to the quality of your SBOM file."""
- ]
- if not is_release_candidate:
- block.p[
- "This report is for revision ",
htm.code[task_result.revision_number], "."
- ] # TODO: Mark if a subsequent score has failed
- elif release.phase == sql.ReleasePhase.RELEASE_CANDIDATE:
- block.p[f"This report is for the latest {release.version} release
candidate."]
-
-
-async def _report_task_results(block: htm.Block, task: sql.Task | None):
- if task is None:
- block.p["No SBOM score found."]
- return await template.blank("SBOM report", content=block.collect())
-
- task_status = task.status
- task_error = task.error
- if (task_status == sql.TaskStatus.QUEUED) or (task_status ==
sql.TaskStatus.ACTIVE):
- block.p["SBOM score is being computed."]
- return await template.blank("SBOM report", content=block.collect())
-
- if task_status == sql.TaskStatus.FAILED:
- block.p[f"SBOM score task failed: {task_error}"]
- return await template.blank("SBOM report", content=block.collect())
-
-
def _augment_section(
block: htm.Block,
release: sql.Release,
@@ -329,6 +167,41 @@ def _augment_section(
)
+def _cdx_to_osv(cdx: results.CdxVulnerabilityDetail) ->
results.VulnerabilityDetails:
+ score = []
+ severity = ""
+ if cdx.ratings is not None:
+ severity, score = sbom.utilities.cdx_severity_to_osv(cdx.ratings)
+ return results.VulnerabilityDetails(
+ id=cdx.id,
+ summary=cdx.description,
+ details=cdx.detail,
+ modified=cdx.updated or "",
+ published=cdx.published,
+ severity=score,
+ database_specific={"severity": severity},
+ references=[{"type": "WEB", "url": a.get("url", "")} for a in
cdx.advisories]
+ if (cdx.advisories is not None)
+ else [],
+ )
+
+
+def _conformance_section(block: htm.Block, task_result: results.SBOMToolScore)
-> None:
+ block.h2["Conformance report"]
+ warnings =
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(w)) for w in
task_result.warnings]
+ errors =
[sbom.models.conformance.MissingAdapter.validate_python(json.loads(e)) for e in
task_result.errors]
+ if warnings:
+ block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"),
"Warnings"]
+ _missing_table(block, warnings)
+
+ if errors:
+ block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
+ _missing_table(block, errors)
+
+ if not (warnings or errors):
+ block.p["No NTIA 2021 minimum data field conformance warnings or
errors found."]
+
+
def _cyclonedx_cli_errors(block: htm.Block, task_result:
results.SBOMToolScore):
block.h2["CycloneDX CLI validation errors"]
if task_result.cli_errors:
@@ -337,6 +210,12 @@ def _cyclonedx_cli_errors(block: htm.Block, task_result:
results.SBOMToolScore):
block.p["No CycloneDX CLI validation errors found."]
+def _detail_table(components: list[str | None]):
+ return htm.table(".table.table-sm.table-bordered.table-striped")[
+ htm.tbody[[htm.tr[htm.td[comp]] for comp in components if comp is not
None]],
+ ]
+
+
def _extract_vulnerability_severity(vuln: results.VulnerabilityDetails) -> str:
"""Extract severity information from vulnerability data."""
data = vuln.database_specific or {}
@@ -352,6 +231,72 @@ def _extract_vulnerability_severity(vuln:
results.VulnerabilityDetails) -> str:
return "Unknown"
+async def _fetch_tasks(
+ file_path: str, project: str, release: sql.Release, version: str
+) -> tuple[sql.Task | None, Sequence[sql.Task], Sequence[sql.Task]]:
+ # TODO: Abstract this code and the sbomtool.MissingAdapter validators
+ async with db.session() as data:
+ via = sql.validate_instrumented_attribute
+ tasks = (
+ await data.task(
+ project_name=project,
+ version_name=version,
+ revision_number=release.latest_revision_number,
+ task_type=sql.TaskType.SBOM_TOOL_SCORE,
+ primary_rel_path=file_path,
+ )
+ .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
+ .all()
+ )
+ augment_tasks = (
+ await data.task(
+ project_name=project,
+ version_name=version,
+ task_type=sql.TaskType.SBOM_AUGMENT,
+ primary_rel_path=file_path,
+ )
+ .order_by(sql.sqlmodel.desc(via(sql.Task.completed)))
+ .all()
+ )
+ # Run or running scans for the current revision
+ osv_tasks = (
+ await data.task(
+ project_name=project,
+ version_name=version,
+ task_type=sql.TaskType.SBOM_OSV_SCAN,
+ primary_rel_path=file_path,
+ revision_number=release.latest_revision_number,
+ )
+ .order_by(sql.sqlmodel.desc(via(sql.Task.added)))
+ .all()
+ )
+ return (tasks[0] if (len(tasks) > 0) else None), augment_tasks,
osv_tasks
+
+
+def _license_section(block: htm.Block, task_result: results.SBOMToolScore) ->
None:
+ block.h2["Licenses"]
+ warnings = []
+ errors = []
+ prev_licenses = None
+ if task_result.prev_licenses is not None:
+ prev_licenses = _load_license_issues(task_result.prev_licenses)
+ if task_result.license_warnings is not None:
+ warnings = _load_license_issues(task_result.license_warnings)
+ if task_result.license_errors is not None:
+ errors = _load_license_issues(task_result.license_errors)
+ # TODO: Rework the rendering of these since category in the table is
redundant.
+ if warnings:
+ block.h3[htm.icon("exclamation-triangle-fill", ".me-2.text-warning"),
"Warnings"]
+ _license_table(block, warnings, prev_licenses)
+
+ if errors:
+ block.h3[htm.icon("x-octagon-fill", ".me-2.text-danger"), "Errors"]
+ _license_table(block, errors, prev_licenses)
+
+ if not (warnings or errors):
+ block.p["No license warnings or errors found."]
+
+
def _license_table(
block: htm.Block,
items: list[sbom.models.licenses.Issue],
@@ -360,59 +305,18 @@ def _license_table(
warning_rows = [
htm.tr[
htm.td[
- f"Category {category!s}"
- if (len(components) == 0)
- else htm.details[htm.summary[f"Category {category!s}"],
htm.div[_detail_table(components)]]
- ],
- htm.td[f"{count!s} {f'({new!s} new, {updated!s} changed)' if (new
or updated) else ''}"],
- ]
- for category, count, new, updated, components in _license_tally(items,
prev)
- ]
- block.table(".table.table-sm.table-bordered.table-striped")[
- htm.thead[htm.tr[htm.th["License Category"], htm.th["Count"]]],
- htm.tbody[*warning_rows],
- ]
-
-
-def _missing_table(block: htm.Block, items:
list[sbom.models.conformance.Missing]) -> None:
- warning_rows = [
- htm.tr[
- htm.td[
- kind.upper()
- if (len(components) == 0)
- else htm.details[htm.summary[kind.upper()],
htm.div[_detail_table(components)]]
- ],
- htm.td[prop],
- htm.td[str(count)],
- ]
- for kind, prop, count, components in _missing_tally(items)
- ]
- block.table(".table.table-sm.table-bordered.table-striped")[
- htm.thead[htm.tr[htm.th["Kind"], htm.th["Property"], htm.th["Count"]]],
- htm.tbody[*warning_rows],
- ]
-
-
-def _detail_table(components: list[str | None]):
- return htm.table(".table.table-sm.table-bordered.table-striped")[
- htm.tbody[[htm.tr[htm.td[comp]] for comp in components if comp is not
None]],
- ]
-
-
-def _missing_tally(items: list[sbom.models.conformance.Missing]) ->
list[tuple[str, str, int, list[str | None]]]:
- counts: dict[tuple[str, str], int] = {}
- components: dict[tuple[str, str], list[str | None]] = {}
- for item in items:
- key = (getattr(item, "kind", ""), getattr(getattr(item, "property",
None), "name", ""))
- counts[key] = counts.get(key, 0) + 1
- if key not in components:
- components[key] = [str(item)]
- elif item.kind == "missing_component_property":
- components[key].append(str(item))
- return sorted(
- [(item, prop, count, components.get((item, prop), [])) for (item,
prop), count in counts.items()],
- key=lambda kv: (kv[0], kv[1]),
- )
+ f"Category {category!s}"
+ if (len(components) == 0)
+ else htm.details[htm.summary[f"Category {category!s}"],
htm.div[_detail_table(components)]]
+ ],
+ htm.td[f"{count!s} {f'({new!s} new, {updated!s} changed)' if (new
or updated) else ''}"],
+ ]
+ for category, count, new, updated, components in _license_tally(items,
prev)
+ ]
+ block.table(".table.table-sm.table-bordered.table-striped")[
+ htm.thead[htm.tr[htm.th["License Category"], htm.th["Count"]]],
+ htm.tbody[*warning_rows],
+ ]
# TODO: Update this to return either a block or something we can use later in
a block for styling reasons
@@ -457,6 +361,121 @@ def _license_tally(
)
+def _load_license_issues(issues: list[str]) ->
list[sbom.models.licenses.Issue]:
+ return [sbom.models.licenses.Issue.model_validate(json.loads(i)) for i in
issues]
+
+
+def _missing_table(block: htm.Block, items:
list[sbom.models.conformance.Missing]) -> None:
+ warning_rows = [
+ htm.tr[
+ htm.td[
+ kind.upper()
+ if (len(components) == 0)
+ else htm.details[htm.summary[kind.upper()],
htm.div[_detail_table(components)]]
+ ],
+ htm.td[prop],
+ htm.td[str(count)],
+ ]
+ for kind, prop, count, components in _missing_tally(items)
+ ]
+ block.table(".table.table-sm.table-bordered.table-striped")[
+ htm.thead[htm.tr[htm.th["Kind"], htm.th["Property"], htm.th["Count"]]],
+ htm.tbody[*warning_rows],
+ ]
+
+
+def _missing_tally(items: list[sbom.models.conformance.Missing]) ->
list[tuple[str, str, int, list[str | None]]]:
+ counts: dict[tuple[str, str], int] = {}
+ components: dict[tuple[str, str], list[str | None]] = {}
+ for item in items:
+ key = (getattr(item, "kind", ""), getattr(getattr(item, "property",
None), "name", ""))
+ counts[key] = counts.get(key, 0) + 1
+ if key not in components:
+ components[key] = [str(item)]
+ elif item.kind == "missing_component_property":
+ components[key].append(str(item))
+ return sorted(
+ [(item, prop, count, components.get((item, prop), [])) for (item,
prop), count in counts.items()],
+ key=lambda kv: (kv[0], kv[1]),
+ )
+
+
+def _outdated_tool_section(block: htm.Block, task_result:
results.SBOMToolScore):
+ block.h2["Outdated tools"]
+ if task_result.outdated:
+ outdated = []
+ if isinstance(task_result.outdated, str):
+ # Older version, only checked one tool
+ outdated =
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(task_result.outdated))]
+ elif isinstance(task_result.outdated, list):
+ # Newer version, checked multiple tools
+ outdated =
[sbom.models.tool.OutdatedAdapter.validate_python(json.loads(o)) for o in
task_result.outdated]
+ if len(outdated) == 0:
+ block.p["No outdated tools found."]
+ for result in outdated:
+ if result.kind == "tool":
+ if "Apache Trusted Releases" in result.name:
+ block.p[
+ f"""The last version of ATR used on this SBOM was
+ {result.used_version} but ATR is currently version
+ {result.available_version}."""
+ ]
+ else:
+ block.p[
+ f"""The {result.name} is outdated. The used version is
+ {result.used_version} and the available version is
+ {result.available_version}."""
+ ]
+ else:
+ if (result.kind == "missing_metadata") or (result.kind ==
"missing_timestamp"):
+ # These both return without checking any further tools as
they prevent checking
+ block.p[
+ f"""There was a problem with the SBOM detected when
trying to
+ determine if any tools were outdated:
+ {result.kind.upper()}."""
+ ]
+ else:
+ block.p[
+ f"""There was a problem with the SBOM detected when
trying to
+ determine if the {result.name} is outdated:
+ {result.kind.upper()}."""
+ ]
+ else:
+ block.p["No outdated tools found."]
+
+
+def _report_header(
+ block: htm.Block, is_release_candidate: bool, release: sql.Release,
task_result: results.SBOMToolScore
+) -> None:
+ block.p[
+ """This is a report by the ATR SBOM tool, for debugging and
+ informational purposes. Please use it only as an approximate
+ guideline to the quality of your SBOM file."""
+ ]
+ if not is_release_candidate:
+ block.p[
+ "This report is for revision ",
htm.code[task_result.revision_number], "."
+ ] # TODO: Mark if a subsequent score has failed
+ elif release.phase == sql.ReleasePhase.RELEASE_CANDIDATE:
+ block.p[f"This report is for the latest {release.version} release
candidate."]
+
+
+async def _report_task_results(block: htm.Block, task: sql.Task | None):
+ if task is None:
+ block.p["No SBOM score found."]
+ return await template.blank("SBOM report", content=block.collect())
+
+ task_status = task.status
+ task_error = task.error
+ if (task_status == sql.TaskStatus.QUEUED) or (task_status ==
sql.TaskStatus.ACTIVE):
+ block.p["SBOM score is being computed."]
+ return await template.blank("SBOM report", content=block.collect())
+
+ if task_status == sql.TaskStatus.FAILED:
+ block.p[f"SBOM score task failed: {task_error}"]
+ return await template.blank("SBOM report", content=block.collect())
+
+
def _severity_to_style(severity: str) -> str:
match severity.lower():
case "critical":
@@ -474,6 +493,15 @@ def _severity_to_style(severity: str) -> str:
return ".bg-info.text-light"
+def _update_worst_severity(severities: list[str], vuln_severity: str, worst:
int) -> int:
+ try:
+ sev_index = severities.index(vuln_severity)
+ except ValueError:
+ sev_index = 99
+ worst = min(worst, sev_index)
+ return worst
+
+
def _vulnerability_component_details_osv(
block: htm.Block,
component: results.OSVComponent,
@@ -546,64 +574,6 @@ def _vulnerability_component_details_osv(
return new
-def _update_worst_severity(severities: list[str], vuln_severity: str, worst:
int) -> int:
- try:
- sev_index = severities.index(vuln_severity)
- except ValueError:
- sev_index = 99
- worst = min(worst, sev_index)
- return worst
-
-
-def _vulnerability_scan_button(block: htm.Block) -> None:
- block.p["You can perform a new vulnerability scan."]
-
- form.render_block(
- block,
- model_cls=shared.sbom.ScanSBOMForm,
- submit_label="Scan file",
- empty=True,
- )
-
-
-def _vulnerability_scan_find_completed_task(osv_tasks: Sequence[sql.Task],
revision_number: str) -> sql.Task | None:
- """Find the most recent completed OSV scan task for the given revision."""
- for task in osv_tasks:
- if (task.status == sql.TaskStatus.COMPLETED) and (task.result is not
None):
- task_result = task.result
- if isinstance(task_result, results.SBOMOSVScan) and
(task_result.revision_number == revision_number):
- return task
- return None
-
-
-def _vulnerability_scan_find_in_progress_task(osv_tasks: Sequence[sql.Task],
revision_number: str) -> sql.Task | None:
- """Find the most recent in-progress OSV scan task for the given
revision."""
- for task in osv_tasks:
- if task.revision_number == revision_number:
- if task.status in (sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE,
sql.TaskStatus.FAILED):
- return task
- return None
-
-
-def _vulnerability_scan_results(
- block: htm.Block,
- vulns: list[results.CdxVulnerabilityDetail],
- scans: list[str],
- task: sql.Task | None,
- prev: list[results.CdxVulnerabilityDetail] | None,
-) -> None:
- previous_vulns = None
- if prev is not None:
- previous_osv = [
- (_cdx_to_osv(v), [a.get("ref", "") for a in v.affects] if
(v.affects is not None) else []) for v in prev
- ]
- previous_vulns = {v.id: (_extract_vulnerability_severity(v), a) for v,
a in previous_osv}
- if task is not None:
- _vulnerability_results_from_scan(task, block, previous_vulns)
- else:
- _vulnerability_results_from_bom(vulns, block, scans, previous_vulns)
-
-
def _vulnerability_results_from_bom(
vulns: list[results.CdxVulnerabilityDetail],
block: htm.Block,
@@ -676,25 +646,55 @@ def _vulnerability_results_from_scan(
block.append(new_block)
-def _cdx_to_osv(cdx: results.CdxVulnerabilityDetail) ->
results.VulnerabilityDetails:
- score = []
- severity = ""
- if cdx.ratings is not None:
- severity, score = sbom.utilities.cdx_severity_to_osv(cdx.ratings)
- return results.VulnerabilityDetails(
- id=cdx.id,
- summary=cdx.description,
- details=cdx.detail,
- modified=cdx.updated or "",
- published=cdx.published,
- severity=score,
- database_specific={"severity": severity},
- references=[{"type": "WEB", "url": a.get("url", "")} for a in
cdx.advisories]
- if (cdx.advisories is not None)
- else [],
+def _vulnerability_scan_button(block: htm.Block) -> None:
+ block.p["You can perform a new vulnerability scan."]
+
+ form.render_block(
+ block,
+ model_cls=shared.sbom.ScanSBOMForm,
+ submit_label="Scan file",
+ empty=True,
)
+def _vulnerability_scan_find_completed_task(osv_tasks: Sequence[sql.Task],
revision_number: str) -> sql.Task | None:
+ """Find the most recent completed OSV scan task for the given revision."""
+ for task in osv_tasks:
+ if (task.status == sql.TaskStatus.COMPLETED) and (task.result is not
None):
+ task_result = task.result
+ if isinstance(task_result, results.SBOMOSVScan) and
(task_result.revision_number == revision_number):
+ return task
+ return None
+
+
+def _vulnerability_scan_find_in_progress_task(osv_tasks: Sequence[sql.Task],
revision_number: str) -> sql.Task | None:
+ """Find the most recent in-progress OSV scan task for the given
revision."""
+ for task in osv_tasks:
+ if task.revision_number == revision_number:
+ if task.status in (sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE,
sql.TaskStatus.FAILED):
+ return task
+ return None
+
+
+def _vulnerability_scan_results(
+ block: htm.Block,
+ vulns: list[results.CdxVulnerabilityDetail],
+ scans: list[str],
+ task: sql.Task | None,
+ prev: list[results.CdxVulnerabilityDetail] | None,
+) -> None:
+ previous_vulns = None
+ if prev is not None:
+ previous_osv = [
+ (_cdx_to_osv(v), [a.get("ref", "") for a in v.affects] if
(v.affects is not None) else []) for v in prev
+ ]
+ previous_vulns = {v.id: (_extract_vulnerability_severity(v), a) for v,
a in previous_osv}
+ if task is not None:
+ _vulnerability_results_from_scan(task, block, previous_vulns)
+ else:
+ _vulnerability_results_from_bom(vulns, block, scans, previous_vulns)
+
+
def _vulnerability_scan_section(
block: htm.Block,
project: str,
diff --git a/atr/shared/distribution.py b/atr/shared/distribution.py
index 6e146c2..023bed3 100644
--- a/atr/shared/distribution.py
+++ b/atr/shared/distribution.py
@@ -32,6 +32,38 @@ import atr.models.sql as sql
import atr.util as util
from atr.storage import outcome
+# async def __json_from_maven_cdn(
+# self, api_url: str, group_id: str, artifact_id: str, version: str
+# ) -> outcome.Outcome[models.basic.JSON]:
+# import datetime
+#
+# try:
+# async with util.create_secure_session() as session:
+# async with session.get(api_url) as response:
+# response.raise_for_status()
+#
+# # Use current time as timestamp since we're just validating the
package exists
+# timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() *
1000)
+#
+# # Convert to dict matching MavenResponse structure
+# result_dict = {
+# "response": {
+# "start": 0,
+# "docs": [
+# {
+# "g": group_id,
+# "a": artifact_id,
+# "v": version,
+# "timestamp": timestamp_ms,
+# }
+# ],
+# }
+# }
+# result = models.basic.as_json(result_dict)
+# return outcome.Result(result)
+# except aiohttp.ClientError as e:
+# return outcome.Error(e)
+
class DistributionError(RuntimeError): ...
@@ -121,173 +153,6 @@ class DistributeForm(form.Form):
return self
-def html_submitted_values_table(block: htm.Block, dd: distribution.Data) ->
None:
- tbody = htm.tbody[
- html_tr("Platform", dd.platform.name),
- html_tr("Owner or Namespace", dd.owner_namespace or "-"),
- html_tr("Package", dd.package),
- html_tr("Version", dd.version),
- ]
- block.table(".table.table-striped.table-bordered")[tbody]
-
-
-def html_tr(label: str, value: str) -> htm.Element:
- return htm.tr[htm.th[label], htm.td[value]]
-
-
-def html_tr_a(label: str, value: str | None) -> htm.Element:
- return htm.tr[htm.th[label], htm.td[htm.a(href=value)[value] if value else
"-"]]
-
-
-async def json_from_distribution_platform(
- api_url: str, platform: sql.DistributionPlatform, version: str
-) -> outcome.Outcome[basic.JSON]:
- try:
- async with util.create_secure_session() as session:
- async with session.get(api_url) as response:
- response.raise_for_status()
- response_json = await response.json()
- result = basic.as_json(response_json)
- except aiohttp.ClientError as e:
- return outcome.Error(e)
- match platform:
- case sql.DistributionPlatform.NPM |
sql.DistributionPlatform.NPM_SCOPED:
- if version not in
distribution.NpmResponse.model_validate(result).time:
- e = DistributionError(f"Version '{version}' not found")
- return outcome.Error(e)
- return outcome.Result(result)
-
-
-async def release_validated(
- project: str, version: str, committee: bool = False, staging: bool | None
= None, release_policy: bool = False
-) -> sql.Release:
- match staging:
- case True:
- phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT}
- case False:
- phase = {sql.ReleasePhase.RELEASE_PREVIEW}
- case None:
- phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT,
sql.ReleasePhase.RELEASE_PREVIEW}
- async with db.session() as data:
- release = await data.release(
- project_name=project,
- version=version,
- _committee=committee,
- _release_policy=release_policy,
- ).demand(RuntimeError(f"Release {project} {version} not found"))
- if release.phase not in phase:
- raise RuntimeError(f"Release {project} {version} is not in
{phase}")
- # if release.project.status != sql.ProjectStatus.ACTIVE:
- # raise RuntimeError(f"Project {project} is not active")
- return release
-
-
-async def release_validated_and_committee(
- project: str, version: str, *, staging: bool | None = None,
release_policy: bool = False
-) -> tuple[sql.Release, sql.Committee]:
- release = await release_validated(project, version, committee=True,
staging=staging, release_policy=release_policy)
- committee = release.committee
- if committee is None:
- raise RuntimeError(f"Release {project} {version} has no committee")
- return release, committee
-
-
-# async def __json_from_maven_cdn(
-# self, api_url: str, group_id: str, artifact_id: str, version: str
-# ) -> outcome.Outcome[models.basic.JSON]:
-# import datetime
-#
-# try:
-# async with util.create_secure_session() as session:
-# async with session.get(api_url) as response:
-# response.raise_for_status()
-#
-# # Use current time as timestamp since we're just validating the
package exists
-# timestamp_ms = int(datetime.datetime.now(datetime.UTC).timestamp() *
1000)
-#
-# # Convert to dict matching MavenResponse structure
-# result_dict = {
-# "response": {
-# "start": 0,
-# "docs": [
-# {
-# "g": group_id,
-# "a": artifact_id,
-# "v": version,
-# "timestamp": timestamp_ms,
-# }
-# ],
-# }
-# }
-# result = models.basic.as_json(result_dict)
-# return outcome.Result(result)
-# except aiohttp.ClientError as e:
-# return outcome.Error(e)
-
-
-async def json_from_maven_xml(api_url: str, version: str) ->
outcome.Outcome[basic.JSON]:
- import datetime
- import xml.etree.ElementTree as ET
-
- try:
- async with util.create_secure_session() as session:
- async with session.get(api_url) as response:
- response.raise_for_status()
- xml_text = await response.text()
-
- # Parse the XML
- root = ET.fromstring(xml_text)
-
- # Extract versioning info
- group = root.find("groupId")
- artifact = root.find("artifactId")
- versioning = root.find("versioning")
- if versioning is None:
- e = DistributionError("No versioning element found in Maven
metadata")
- return outcome.Error(e)
-
- # Get lastUpdated timestamp (format: yyyyMMddHHmmss)
- last_updated_elem = versioning.find("lastUpdated")
- if (last_updated_elem is None) or (not last_updated_elem.text):
- e = DistributionError("No lastUpdated timestamp found in Maven
metadata")
- return outcome.Error(e)
-
- # Convert lastUpdated string to Unix timestamp in milliseconds
- last_updated_str = last_updated_elem.text
- dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S")
- dt = dt.replace(tzinfo=datetime.UTC)
- timestamp_ms = int(dt.timestamp() * 1000)
-
- # Verify the version exists
- versions_elem = versioning.find("versions")
- if versions_elem is not None:
- versions = [v.text for v in versions_elem.findall("version") if
v.text]
- if version not in versions:
- e = DistributionError(f"Version '{version}' not found in Maven
metadata")
- return outcome.Error(e)
-
- # Convert to dict matching MavenResponse structure
- result_dict = {
- "response": {
- "start": 0,
- "docs": [
- {
- "g": group.text if (group is not None) else "",
- "a": artifact.text if (artifact is not None) else "",
- "v": version,
- "timestamp": timestamp_ms,
- }
- ],
- }
- }
- result = basic.as_json(result_dict)
- return outcome.Result(result)
- except (aiohttp.ClientError, DistributionError) as e:
- return outcome.Error(e)
- except ET.ParseError as e:
- return outcome.Error(RuntimeError(f"Failed to parse Maven XML: {e}"))
-
-
def distribution_upload_date( # noqa: C901
platform: sql.DistributionPlatform,
data: basic.JSON,
@@ -393,6 +258,140 @@ def get_api_url(dd: distribution.Data, staging: bool |
None = None):
return api_url
+def html_submitted_values_table(block: htm.Block, dd: distribution.Data) ->
None:
+ tbody = htm.tbody[
+ html_tr("Platform", dd.platform.name),
+ html_tr("Owner or Namespace", dd.owner_namespace or "-"),
+ html_tr("Package", dd.package),
+ html_tr("Version", dd.version),
+ ]
+ block.table(".table.table-striped.table-bordered")[tbody]
+
+
+def html_tr(label: str, value: str) -> htm.Element:
+ return htm.tr[htm.th[label], htm.td[value]]
+
+
+def html_tr_a(label: str, value: str | None) -> htm.Element:
+ return htm.tr[htm.th[label], htm.td[htm.a(href=value)[value] if value else
"-"]]
+
+
+async def json_from_distribution_platform(
+ api_url: str, platform: sql.DistributionPlatform, version: str
+) -> outcome.Outcome[basic.JSON]:
+ try:
+ async with util.create_secure_session() as session:
+ async with session.get(api_url) as response:
+ response.raise_for_status()
+ response_json = await response.json()
+ result = basic.as_json(response_json)
+ except aiohttp.ClientError as e:
+ return outcome.Error(e)
+ match platform:
+ case sql.DistributionPlatform.NPM |
sql.DistributionPlatform.NPM_SCOPED:
+ if version not in
distribution.NpmResponse.model_validate(result).time:
+ e = DistributionError(f"Version '{version}' not found")
+ return outcome.Error(e)
+ return outcome.Result(result)
+
+
+async def json_from_maven_xml(api_url: str, version: str) ->
outcome.Outcome[basic.JSON]:
+ import datetime
+ import xml.etree.ElementTree as ET
+
+ try:
+ async with util.create_secure_session() as session:
+ async with session.get(api_url) as response:
+ response.raise_for_status()
+ xml_text = await response.text()
+
+ # Parse the XML
+ root = ET.fromstring(xml_text)
+
+ # Extract versioning info
+ group = root.find("groupId")
+ artifact = root.find("artifactId")
+ versioning = root.find("versioning")
+ if versioning is None:
+ e = DistributionError("No versioning element found in Maven
metadata")
+ return outcome.Error(e)
+
+ # Get lastUpdated timestamp (format: yyyyMMddHHmmss)
+ last_updated_elem = versioning.find("lastUpdated")
+ if (last_updated_elem is None) or (not last_updated_elem.text):
+ e = DistributionError("No lastUpdated timestamp found in Maven
metadata")
+ return outcome.Error(e)
+
+ # Convert lastUpdated string to Unix timestamp in milliseconds
+ last_updated_str = last_updated_elem.text
+ dt = datetime.datetime.strptime(last_updated_str, "%Y%m%d%H%M%S")
+ dt = dt.replace(tzinfo=datetime.UTC)
+ timestamp_ms = int(dt.timestamp() * 1000)
+
+ # Verify the version exists
+ versions_elem = versioning.find("versions")
+ if versions_elem is not None:
+ versions = [v.text for v in versions_elem.findall("version") if
v.text]
+ if version not in versions:
+ e = DistributionError(f"Version '{version}' not found in Maven
metadata")
+ return outcome.Error(e)
+
+ # Convert to dict matching MavenResponse structure
+ result_dict = {
+ "response": {
+ "start": 0,
+ "docs": [
+ {
+ "g": group.text if (group is not None) else "",
+ "a": artifact.text if (artifact is not None) else "",
+ "v": version,
+ "timestamp": timestamp_ms,
+ }
+ ],
+ }
+ }
+ result = basic.as_json(result_dict)
+ return outcome.Result(result)
+ except (aiohttp.ClientError, DistributionError) as e:
+ return outcome.Error(e)
+ except ET.ParseError as e:
+ return outcome.Error(RuntimeError(f"Failed to parse Maven XML: {e}"))
+
+
+async def release_validated(
+ project: str, version: str, committee: bool = False, staging: bool | None
= None, release_policy: bool = False
+) -> sql.Release:
+ match staging:
+ case True:
+ phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT}
+ case False:
+ phase = {sql.ReleasePhase.RELEASE_PREVIEW}
+ case None:
+ phase = {sql.ReleasePhase.RELEASE_CANDIDATE_DRAFT,
sql.ReleasePhase.RELEASE_PREVIEW}
+ async with db.session() as data:
+ release = await data.release(
+ project_name=project,
+ version=version,
+ _committee=committee,
+ _release_policy=release_policy,
+ ).demand(RuntimeError(f"Release {project} {version} not found"))
+ if release.phase not in phase:
+ raise RuntimeError(f"Release {project} {version} is not in
{phase}")
+ # if release.project.status != sql.ProjectStatus.ACTIVE:
+ # raise RuntimeError(f"Project {project} is not active")
+ return release
+
+
+async def release_validated_and_committee(
+ project: str, version: str, *, staging: bool | None = None,
release_policy: bool = False
+) -> tuple[sql.Release, sql.Committee]:
+ release = await release_validated(project, version, committee=True,
staging=staging, release_policy=release_policy)
+ committee = release.committee
+ if committee is None:
+ raise RuntimeError(f"Release {project} {version} has no committee")
+ return release, committee
+
+
def _template_url(
dd: distribution.Data,
staging: bool | None = None,
diff --git a/atr/shared/ignores.py b/atr/shared/ignores.py
index 347a133..2b41f66 100644
--- a/atr/shared/ignores.py
+++ b/atr/shared/ignores.py
@@ -40,32 +40,6 @@ class IgnoreStatus(enum.Enum):
WARNING = "Warning"
-def ignore_status_to_sql(status: IgnoreStatus | None) ->
sql.CheckResultStatusIgnore | None:
- """Convert wrapper enum to SQL enum."""
- if (status is None) or (status == IgnoreStatus.NO_STATUS):
- return None
- match status:
- case IgnoreStatus.EXCEPTION:
- return sql.CheckResultStatusIgnore.EXCEPTION
- case IgnoreStatus.FAILURE:
- return sql.CheckResultStatusIgnore.FAILURE
- case IgnoreStatus.WARNING:
- return sql.CheckResultStatusIgnore.WARNING
-
-
-def sql_to_ignore_status(status: sql.CheckResultStatusIgnore | None) ->
IgnoreStatus:
- """Convert SQL enum to wrapper enum."""
- if status is None:
- return IgnoreStatus.NO_STATUS
- match status:
- case sql.CheckResultStatusIgnore.EXCEPTION:
- return IgnoreStatus.EXCEPTION
- case sql.CheckResultStatusIgnore.FAILURE:
- return IgnoreStatus.FAILURE
- case sql.CheckResultStatusIgnore.WARNING:
- return IgnoreStatus.WARNING
-
-
class AddIgnoreForm(form.Form):
variant: ADD = form.value(ADD)
release_glob: str = form.label("Release pattern", default="")
@@ -154,6 +128,32 @@ type IgnoreForm = Annotated[
]
+def ignore_status_to_sql(status: IgnoreStatus | None) ->
sql.CheckResultStatusIgnore | None:
+ """Convert wrapper enum to SQL enum."""
+ if (status is None) or (status == IgnoreStatus.NO_STATUS):
+ return None
+ match status:
+ case IgnoreStatus.EXCEPTION:
+ return sql.CheckResultStatusIgnore.EXCEPTION
+ case IgnoreStatus.FAILURE:
+ return sql.CheckResultStatusIgnore.FAILURE
+ case IgnoreStatus.WARNING:
+ return sql.CheckResultStatusIgnore.WARNING
+
+
+def sql_to_ignore_status(status: sql.CheckResultStatusIgnore | None) ->
IgnoreStatus:
+ """Convert SQL enum to wrapper enum."""
+ if status is None:
+ return IgnoreStatus.NO_STATUS
+ match status:
+ case sql.CheckResultStatusIgnore.EXCEPTION:
+ return IgnoreStatus.EXCEPTION
+ case sql.CheckResultStatusIgnore.FAILURE:
+ return IgnoreStatus.FAILURE
+ case sql.CheckResultStatusIgnore.WARNING:
+ return IgnoreStatus.WARNING
+
+
def _validate_ignore_form_patterns(*patterns: str) -> None:
for pattern in patterns:
if not pattern:
diff --git a/atr/util.py b/atr/util.py
index f573995..8d19a6b 100644
--- a/atr/util.py
+++ b/atr/util.py
@@ -279,7 +279,7 @@ def cookie_pmcs_or_session_pmcs(session_data:
session.ClientSession) -> list[str
pmcs = cookie_pmcs()
if pmcs is None:
pmcs = session_data.committees
- if not isinstance(pmcs, list) or (not (all(isinstance(item, str) for item
in pmcs))):
+ if (not isinstance(pmcs, list)) or (not (all(isinstance(item, str) for
item in pmcs))):
raise TypeError("Session pmcs must be a list[str]")
return pmcs
diff --git a/tests/unit/test_cache.py b/tests/unit/test_cache.py
index 9990c8b..a165778 100644
--- a/tests/unit/test_cache.py
+++ b/tests/unit/test_cache.py
@@ -29,26 +29,26 @@ if TYPE_CHECKING:
from pytest import MonkeyPatch
-class _MockApp:
+class MockApp:
def __init__(self):
self.extensions: dict[str, object] = {}
-class _MockConfig:
+class MockConfig:
def __init__(self, state_dir: pathlib.Path):
self.STATE_DIR = str(state_dir)
@pytest.fixture
-def mock_app(monkeypatch: "MonkeyPatch") -> _MockApp:
- app = _MockApp()
+def mock_app(monkeypatch: "MonkeyPatch") -> MockApp:
+ app = MockApp()
monkeypatch.setattr("asfquart.APP", app)
return app
@pytest.fixture
def state_dir(tmp_path: pathlib.Path, monkeypatch: "MonkeyPatch") ->
pathlib.Path:
- monkeypatch.setattr("atr.config.get", lambda: _MockConfig(tmp_path))
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig(tmp_path))
return tmp_path
@@ -109,18 +109,18 @@ async def
test_admins_get_async_falls_back_to_file_without_app_context(
@pytest.mark.asyncio
-async def test_admins_get_async_uses_extensions_when_available(mock_app:
_MockApp):
+async def test_admins_get_async_uses_extensions_when_available(mock_app:
MockApp):
mock_app.extensions["admins"] = frozenset({"async_alice"})
result = await cache.admins_get_async()
assert result == frozenset({"async_alice"})
-def test_admins_get_returns_empty_frozenset_when_not_set(mock_app: _MockApp):
+def test_admins_get_returns_empty_frozenset_when_not_set(mock_app: MockApp):
result = cache.admins_get()
assert result == frozenset()
-def test_admins_get_returns_frozenset_from_extensions(mock_app: _MockApp):
+def test_admins_get_returns_frozenset_from_extensions(mock_app: MockApp):
mock_app.extensions["admins"] = frozenset({"alice", "bob"})
result = cache.admins_get()
assert result == frozenset({"alice", "bob"})
@@ -178,7 +178,7 @@ async def
test_admins_save_to_file_creates_parent_dirs(state_dir: pathlib.Path):
@pytest.mark.asyncio
async def test_admins_startup_load_calls_ldap_when_cache_missing(
- state_dir: pathlib.Path, mock_app: _MockApp, monkeypatch: "MonkeyPatch"
+ state_dir: pathlib.Path, mock_app: MockApp, monkeypatch: "MonkeyPatch"
):
ldap_called = False
@@ -199,7 +199,7 @@ async def
test_admins_startup_load_calls_ldap_when_cache_missing(
@pytest.mark.asyncio
async def test_admins_startup_load_handles_ldap_failure(
- state_dir: pathlib.Path, mock_app: _MockApp, monkeypatch: "MonkeyPatch"
+ state_dir: pathlib.Path, mock_app: MockApp, monkeypatch: "MonkeyPatch"
):
async def mock_fetch_admin_users() -> frozenset[str]:
raise ConnectionError("LDAP server unavailable")
@@ -215,7 +215,7 @@ async def test_admins_startup_load_handles_ldap_failure(
@pytest.mark.asyncio
async def test_admins_startup_load_uses_cache_when_present(
- state_dir: pathlib.Path, mock_app: _MockApp, monkeypatch: "MonkeyPatch"
+ state_dir: pathlib.Path, mock_app: MockApp, monkeypatch: "MonkeyPatch"
):
ldap_called = False
diff --git a/tests/unit/test_ldap.py b/tests/unit/test_ldap.py
index 43fdf8a..15f8844 100644
--- a/tests/unit/test_ldap.py
+++ b/tests/unit/test_ldap.py
@@ -27,12 +27,12 @@ if TYPE_CHECKING:
from pytest import MonkeyPatch
-class _MockApp:
+class MockApp:
def __init__(self):
self.extensions: dict[str, object] = {}
-class _MockConfig:
+class MockConfig:
def __init__(self, state_dir: pathlib.Path, ldap_bind_dn: str | None,
ldap_bind_password: str | None):
self.STATE_DIR = str(state_dir)
self.LDAP_BIND_DN = ldap_bind_dn
@@ -53,10 +53,10 @@ async def test_admins_startup_load_fetches_real_admins(
import atr.config as config
real_config = config.get()
- mock_config = _MockConfig(tmp_path, real_config.LDAP_BIND_DN,
real_config.LDAP_BIND_PASSWORD)
+ mock_config = MockConfig(tmp_path, real_config.LDAP_BIND_DN,
real_config.LDAP_BIND_PASSWORD)
monkeypatch.setattr("atr.config.get", lambda: mock_config)
- mock_app = _MockApp()
+ mock_app = MockApp()
monkeypatch.setattr("asfquart.APP", mock_app)
await cache.admins_startup_load()
diff --git a/tests/unit/test_mail.py b/tests/unit/test_mail.py
index 2f5f5e6..39a8339 100644
--- a/tests/unit/test_mail.py
+++ b/tests/unit/test_mail.py
@@ -31,107 +31,31 @@ if TYPE_CHECKING:
@pytest.mark.asyncio
-async def test_send_rejects_crlf_in_subject(monkeypatch: "MonkeyPatch") ->
None:
- """Test that CRLF injection in subject field is rejected."""
- # Mock _send_many to ensure we never actually send emails
+async def test_address_objects_used_for_from_to_headers(monkeypatch:
"MonkeyPatch") -> None:
+ """Test that Address objects are used for From/To headers."""
mock_send_many = AsyncMock(return_value=[])
monkeypatch.setattr("atr.mail._send_many", mock_send_many)
- # Create a malicious message with CRLF in the subject
- malicious_message = mail.Message(
+ legitimate_message = mail.Message(
email_sender="[email protected]",
email_recipient="[email protected]",
- subject="Legitimate Subject\r\nBcc: [email protected]",
- body="This is a test message",
- )
-
- # Call send and expect it to catch the injection
- _, errors = await mail.send(malicious_message)
-
- # Assert that the function returned an error
- assert len(errors) == 1
- assert "CRLF injection detected" in errors[0]
-
- # Assert that _send_many was never called (email was not sent)
- mock_send_many.assert_not_called()
-
-
[email protected]
-async def test_send_rejects_crlf_in_from_address(monkeypatch: "MonkeyPatch")
-> None:
- """Test that CRLF injection in from address field is rejected.
-
- Note: The from_addr validation happens before EmailMessage processing,
- so this test verifies the early validation layer also protects against
injection.
- """
- mock_send_many = AsyncMock(return_value=[])
- monkeypatch.setattr("atr.mail._send_many", mock_send_many)
-
- # Create a malicious message with CRLF in the from address
- malicious_message = mail.Message(
- email_sender="[email protected]\r\nBcc: [email protected]",
- email_recipient="[email protected]",
subject="Test Subject",
- body="This is a test message",
- )
-
- # Call send and expect it to raise ValueError due to invalid from_addr
format
- with pytest.raises(ValueError, match=r"from_addr must end with
@apache.org"):
- await mail.send(malicious_message)
-
- # Assert that _send_many was never called
- mock_send_many.assert_not_called()
-
-
[email protected]
-async def test_send_rejects_crlf_in_to_address(monkeypatch: "MonkeyPatch") ->
None:
- """Test that CRLF injection in to address field is rejected.
-
- Note: The _validate_recipient check happens before EmailMessage processing,
- so this test verifies the early validation layer also protects against
injection.
- """
- mock_send_many = AsyncMock(return_value=[])
- monkeypatch.setattr("atr.mail._send_many", mock_send_many)
-
- # Create a malicious message with CRLF in the to address
- malicious_message = mail.Message(
- email_sender="[email protected]",
- email_recipient="[email protected]\r\nBcc: [email protected]",
- subject="Test Subject",
- body="This is a test message",
+ body="Test body",
)
- # Call send and expect it to raise ValueError due to invalid recipient
format
- with pytest.raises(ValueError, match=r"Email recipient must be
@apache.org"):
- await mail.send(malicious_message)
-
- # Assert that _send_many was never called
- mock_send_many.assert_not_called()
-
-
[email protected]
-async def test_send_rejects_crlf_in_reply_to(monkeypatch: "MonkeyPatch") ->
None:
- """Test that CRLF injection in in_reply_to field is rejected."""
- mock_send_many = AsyncMock(return_value=[])
- monkeypatch.setattr("atr.mail._send_many", mock_send_many)
-
- # Create a malicious message with CRLF in the in_reply_to field
- malicious_message = mail.Message(
- email_sender="[email protected]",
- email_recipient="[email protected]",
- subject="Test Subject",
- body="This is a test message",
- in_reply_to="[email protected]\r\nBcc: [email protected]",
- )
+ _, errors = await mail.send(legitimate_message)
- # Call send and expect it to catch the injection
- _, errors = await mail.send(malicious_message)
+ # Verify the message was sent successfully
+ assert len(errors) == 0
+ mock_send_many.assert_called_once()
- # Assert that the function returned an error
- assert len(errors) == 1
- assert "CRLF injection detected" in errors[0]
+ # Verify the generated email bytes contain properly formatted addresses
+ call_args = mock_send_many.call_args
+ msg_text = call_args[0][2] # already a str
- # Assert that _send_many was never called
- mock_send_many.assert_not_called()
+ # Address objects format email addresses properly
+ assert "From: [email protected]" in msg_text
+ assert "To: [email protected]" in msg_text
@pytest.mark.asyncio
@@ -196,17 +120,72 @@ async def
test_send_accepts_message_with_reply_to(monkeypatch: "MonkeyPatch") ->
@pytest.mark.asyncio
-async def test_send_rejects_lf_only_injection(monkeypatch: "MonkeyPatch") ->
None:
- """Test that injection with LF only (\\n) is also rejected."""
+async def test_send_handles_non_ascii_headers(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that non-ASCII characters in headers are handled correctly."""
mock_send_many = AsyncMock(return_value=[])
monkeypatch.setattr("atr.mail._send_many", mock_send_many)
- # Create a malicious message with just LF (no CR)
+ # Create a message with non-ASCII characters in the subject
+ message_with_unicode = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Test avec Accént",
+ body="This message has non-ASCII characters in the subject.",
+ )
+
+ # Call send
+ _mid, errors = await mail.send(message_with_unicode)
+
+ # Assert that no errors were returned
+ assert len(errors) == 0
+
+ # Assert that _send_many was called with a string (not bytes)
+ mock_send_many.assert_called_once()
+ call_args = mock_send_many.call_args
+ msg_text = call_args[0][2] # Third argument should be str
+ assert isinstance(msg_text, str)
+
+ # Verify the subject is present in the message
+ assert "Subject: Test avec Accént" in msg_text
+
+
[email protected]
+async def test_send_rejects_bcc_header_injection(monkeypatch: "MonkeyPatch")
-> None:
+ """Test a realistic Bcc header injection attack scenario."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message attempting to inject a Bcc header
malicious_message = mail.Message(
email_sender="[email protected]",
email_recipient="[email protected]",
- subject="Legitimate Subject\nBcc: [email protected]",
- body="This is a test message",
+ subject="Important Notice\r\nBcc:
[email protected]\r\nX-Priority: 1",
+ body="This message attempts to secretly copy an attacker.",
+ )
+
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
+
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_content_type_injection(monkeypatch: "MonkeyPatch")
-> None:
+ """Test injection attempting to override Content-Type header."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message attempting to inject Content-Type
+ malicious_message = mail.Message(
+ email_sender="[email protected]",
+ email_recipient="[email protected]",
+ subject="Test\r\nContent-Type:
text/html\r\n\r\n<html><script>alert('XSS')</script></html>",
+ body="Normal body",
)
# Call send and expect it to catch the injection
@@ -246,17 +225,44 @@ async def
test_send_rejects_cr_only_injection(monkeypatch: "MonkeyPatch") -> Non
@pytest.mark.asyncio
-async def test_send_rejects_bcc_header_injection(monkeypatch: "MonkeyPatch")
-> None:
- """Test a realistic Bcc header injection attack scenario."""
+async def test_send_rejects_crlf_in_from_address(monkeypatch: "MonkeyPatch")
-> None:
+ """Test that CRLF injection in from address field is rejected.
+
+ Note: The from_addr validation happens before EmailMessage processing,
+ so this test verifies the early validation layer also protects against
injection.
+ """
mock_send_many = AsyncMock(return_value=[])
monkeypatch.setattr("atr.mail._send_many", mock_send_many)
- # Create a malicious message attempting to inject a Bcc header
+ # Create a malicious message with CRLF in the from address
+ malicious_message = mail.Message(
+ email_sender="[email protected]\r\nBcc: [email protected]",
+ email_recipient="[email protected]",
+ subject="Test Subject",
+ body="This is a test message",
+ )
+
+ # Call send and expect it to raise ValueError due to invalid from_addr
format
+ with pytest.raises(ValueError, match=r"from_addr must end with
@apache.org"):
+ await mail.send(malicious_message)
+
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
+
+
[email protected]
+async def test_send_rejects_crlf_in_reply_to(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that CRLF injection in in_reply_to field is rejected."""
+ mock_send_many = AsyncMock(return_value=[])
+ monkeypatch.setattr("atr.mail._send_many", mock_send_many)
+
+ # Create a malicious message with CRLF in the in_reply_to field
malicious_message = mail.Message(
email_sender="[email protected]",
email_recipient="[email protected]",
- subject="Important Notice\r\nBcc:
[email protected]\r\nX-Priority: 1",
- body="This message attempts to secretly copy an attacker.",
+ subject="Test Subject",
+ body="This is a test message",
+ in_reply_to="[email protected]\r\nBcc: [email protected]",
)
# Call send and expect it to catch the injection
@@ -271,17 +277,18 @@ async def
test_send_rejects_bcc_header_injection(monkeypatch: "MonkeyPatch") ->
@pytest.mark.asyncio
-async def test_send_rejects_content_type_injection(monkeypatch: "MonkeyPatch")
-> None:
- """Test injection attempting to override Content-Type header."""
+async def test_send_rejects_crlf_in_subject(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that CRLF injection in subject field is rejected."""
+ # Mock _send_many to ensure we never actually send emails
mock_send_many = AsyncMock(return_value=[])
monkeypatch.setattr("atr.mail._send_many", mock_send_many)
- # Create a malicious message attempting to inject Content-Type
+ # Create a malicious message with CRLF in the subject
malicious_message = mail.Message(
email_sender="[email protected]",
email_recipient="[email protected]",
- subject="Test\r\nContent-Type:
text/html\r\n\r\n<html><script>alert('XSS')</script></html>",
- body="Normal body",
+ subject="Legitimate Subject\r\nBcc: [email protected]",
+ body="This is a test message",
)
# Call send and expect it to catch the injection
@@ -291,66 +298,59 @@ async def
test_send_rejects_content_type_injection(monkeypatch: "MonkeyPatch") -
assert len(errors) == 1
assert "CRLF injection detected" in errors[0]
- # Assert that _send_many was never called
+ # Assert that _send_many was never called (email was not sent)
mock_send_many.assert_not_called()
@pytest.mark.asyncio
-async def test_address_objects_used_for_from_to_headers(monkeypatch:
"MonkeyPatch") -> None:
- """Test that Address objects are used for From/To headers."""
+async def test_send_rejects_crlf_in_to_address(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that CRLF injection in to address field is rejected.
+
+ Note: The _validate_recipient check happens before EmailMessage processing,
+ so this test verifies the early validation layer also protects against
injection.
+ """
mock_send_many = AsyncMock(return_value=[])
monkeypatch.setattr("atr.mail._send_many", mock_send_many)
- legitimate_message = mail.Message(
+ # Create a malicious message with CRLF in the to address
+ malicious_message = mail.Message(
email_sender="[email protected]",
- email_recipient="[email protected]",
+ email_recipient="[email protected]\r\nBcc: [email protected]",
subject="Test Subject",
- body="Test body",
+ body="This is a test message",
)
- _, errors = await mail.send(legitimate_message)
-
- # Verify the message was sent successfully
- assert len(errors) == 0
- mock_send_many.assert_called_once()
-
- # Verify the generated email bytes contain properly formatted addresses
- call_args = mock_send_many.call_args
- msg_text = call_args[0][2] # already a str
+ # Call send and expect it to raise ValueError due to invalid recipient
format
+ with pytest.raises(ValueError, match=r"Email recipient must be
@apache.org"):
+ await mail.send(malicious_message)
- # Address objects format email addresses properly
- assert "From: [email protected]" in msg_text
- assert "To: [email protected]" in msg_text
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
@pytest.mark.asyncio
-async def test_send_handles_non_ascii_headers(monkeypatch: "MonkeyPatch") ->
None:
- """Test that non-ASCII characters in headers are handled correctly."""
+async def test_send_rejects_lf_only_injection(monkeypatch: "MonkeyPatch") ->
None:
+ """Test that injection with LF only (\\n) is also rejected."""
mock_send_many = AsyncMock(return_value=[])
monkeypatch.setattr("atr.mail._send_many", mock_send_many)
- # Create a message with non-ASCII characters in the subject
- message_with_unicode = mail.Message(
+ # Create a malicious message with just LF (no CR)
+ malicious_message = mail.Message(
email_sender="[email protected]",
email_recipient="[email protected]",
- subject="Test avec Accént",
- body="This message has non-ASCII characters in the subject.",
+ subject="Legitimate Subject\nBcc: [email protected]",
+ body="This is a test message",
)
- # Call send
- _mid, errors = await mail.send(message_with_unicode)
-
- # Assert that no errors were returned
- assert len(errors) == 0
+ # Call send and expect it to catch the injection
+ _, errors = await mail.send(malicious_message)
- # Assert that _send_many was called with a string (not bytes)
- mock_send_many.assert_called_once()
- call_args = mock_send_many.call_args
- msg_text = call_args[0][2] # Third argument should be str
- assert isinstance(msg_text, str)
+ # Assert that the function returned an error
+ assert len(errors) == 1
+ assert "CRLF injection detected" in errors[0]
- # Verify the subject is present in the message
- assert "Subject: Test avec Accént" in msg_text
+ # Assert that _send_many was never called
+ mock_send_many.assert_not_called()
def test_smtp_policy_vs_smtputf8() -> None:
diff --git a/tests/unit/test_user.py b/tests/unit/test_user.py
index 268b22e..23c93ce 100644
--- a/tests/unit/test_user.py
+++ b/tests/unit/test_user.py
@@ -25,83 +25,83 @@ if TYPE_CHECKING:
from pytest import MonkeyPatch
-class _MockApp:
+class MockApp:
def __init__(self):
self.extensions: dict[str, object] = {}
-class _MockConfig:
+class MockConfig:
def __init__(self, allow_tests: bool = False, admin_users_additional: str
= ""):
self.ALLOW_TESTS = allow_tests
self.ADMIN_USERS_ADDITIONAL = admin_users_additional
@pytest.fixture
-def mock_app(monkeypatch: "MonkeyPatch") -> _MockApp:
- app = _MockApp()
+def mock_app(monkeypatch: "MonkeyPatch") -> MockApp:
+ app = MockApp()
monkeypatch.setattr("asfquart.APP", app)
return app
@pytest.mark.asyncio
-async def test_is_admin_async_returns_false_for_none(mock_app: _MockApp,
monkeypatch: "MonkeyPatch"):
- monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+async def test_is_admin_async_returns_false_for_none(mock_app: MockApp,
monkeypatch: "MonkeyPatch"):
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig())
mock_app.extensions["admins"] = frozenset()
assert await user.is_admin_async(None) is False
@pytest.mark.asyncio
-async def test_is_admin_async_returns_true_for_cached_admin(mock_app:
_MockApp, monkeypatch: "MonkeyPatch"):
+async def test_is_admin_async_returns_true_for_cached_admin(mock_app: MockApp,
monkeypatch: "MonkeyPatch"):
user._get_additional_admin_users.cache_clear()
- monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig())
mock_app.extensions["admins"] = frozenset({"async_admin"})
assert await user.is_admin_async("async_admin") is True
@pytest.mark.asyncio
-async def test_is_admin_async_returns_true_for_test_user(mock_app: _MockApp,
monkeypatch: "MonkeyPatch"):
+async def test_is_admin_async_returns_true_for_test_user(mock_app: MockApp,
monkeypatch: "MonkeyPatch"):
user._get_additional_admin_users.cache_clear()
- monkeypatch.setattr("atr.config.get", lambda:
_MockConfig(allow_tests=True))
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig(allow_tests=True))
mock_app.extensions["admins"] = frozenset()
assert await user.is_admin_async("test") is True
-def test_is_admin_returns_false_for_none(mock_app: _MockApp, monkeypatch:
"MonkeyPatch"):
- monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+def test_is_admin_returns_false_for_none(mock_app: MockApp, monkeypatch:
"MonkeyPatch"):
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig())
mock_app.extensions["admins"] = frozenset()
assert user.is_admin(None) is False
-def test_is_admin_returns_false_for_test_user_when_not_allowed(mock_app:
_MockApp, monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_false_for_test_user_when_not_allowed(mock_app:
MockApp, monkeypatch: "MonkeyPatch"):
user._get_additional_admin_users.cache_clear()
- monkeypatch.setattr("atr.config.get", lambda:
_MockConfig(allow_tests=False))
+ monkeypatch.setattr("atr.config.get", lambda:
MockConfig(allow_tests=False))
mock_app.extensions["admins"] = frozenset()
assert user.is_admin("test") is False
-def test_is_admin_returns_false_for_unknown_user(mock_app: _MockApp,
monkeypatch: "MonkeyPatch"):
- monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+def test_is_admin_returns_false_for_unknown_user(mock_app: MockApp,
monkeypatch: "MonkeyPatch"):
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig())
mock_app.extensions["admins"] = frozenset({"alice", "bob"})
assert user.is_admin("nobody") is False
-def test_is_admin_returns_true_for_additional_admin(mock_app: _MockApp,
monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_true_for_additional_admin(mock_app: MockApp,
monkeypatch: "MonkeyPatch"):
user._get_additional_admin_users.cache_clear()
- monkeypatch.setattr("atr.config.get", lambda:
_MockConfig(admin_users_additional="alice,bob"))
+ monkeypatch.setattr("atr.config.get", lambda:
MockConfig(admin_users_additional="alice,bob"))
mock_app.extensions["admins"] = frozenset()
assert user.is_admin("alice") is True
assert user.is_admin("bob") is True
-def test_is_admin_returns_true_for_cached_admin(mock_app: _MockApp,
monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_true_for_cached_admin(mock_app: MockApp,
monkeypatch: "MonkeyPatch"):
user._get_additional_admin_users.cache_clear()
- monkeypatch.setattr("atr.config.get", lambda: _MockConfig())
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig())
mock_app.extensions["admins"] = frozenset({"cached_admin"})
assert user.is_admin("cached_admin") is True
-def test_is_admin_returns_true_for_test_user_when_allowed(mock_app: _MockApp,
monkeypatch: "MonkeyPatch"):
+def test_is_admin_returns_true_for_test_user_when_allowed(mock_app: MockApp,
monkeypatch: "MonkeyPatch"):
user._get_additional_admin_users.cache_clear()
- monkeypatch.setattr("atr.config.get", lambda:
_MockConfig(allow_tests=True))
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig(allow_tests=True))
mock_app.extensions["admins"] = frozenset()
assert user.is_admin("test") is True
diff --git a/tests/unit/test_vote.py b/tests/unit/test_vote.py
index 7d7da0e..f204c9e 100644
--- a/tests/unit/test_vote.py
+++ b/tests/unit/test_vote.py
@@ -56,6 +56,27 @@ def mock_app(monkeypatch: MonkeyPatch) -> MockApp:
return app
+def test_admin_not_on_pmc_has_non_binding_vote(mock_app: MockApp, monkeypatch:
MonkeyPatch) -> None:
+ """Admins who are not PMC members do not get binding votes."""
+ user._get_additional_admin_users.cache_clear()
+ monkeypatch.setattr("atr.config.get", lambda: MockConfig())
+ mock_app.extensions["admins"] = frozenset({"admin_user"})
+
+ assert user.is_admin("admin_user") is True
+
+ committee = MockCommittee(
+ committee_members=["alice", "bob"],
+ committers=["alice", "bob", "charlie"],
+ )
+ # admin_user is not on this PMC
+ is_pmc_member = user.is_committee_member(committee, "admin_user") # type:
ignore[arg-type]
+ assert is_pmc_member is False
+
+ # Binding is determined ONLY by PMC membership, not admin status
+ is_binding = is_pmc_member
+ assert is_binding is False
+
+
def test_binding_vote_body_format() -> None:
"""Binding votes include '(binding)' in the email body."""
body = vote_writer.format_vote_email_body(
@@ -67,78 +88,55 @@ def test_binding_vote_body_format() -> None:
assert body == "+1 (binding) (pmcmember) PMC Member"
-def test_non_binding_vote_body_format() -> None:
- """Non-binding votes do not include '(binding)' in the email body."""
+def test_binding_vote_with_comment() -> None:
+ """Binding votes with comments include the comment and signature."""
body = vote_writer.format_vote_email_body(
vote="+1",
- asf_uid="contributor",
- fullname="A Contributor",
- is_binding=False,
- )
- assert body == "+1 (contributor) A Contributor"
-
-
-def test_negative_binding_vote_body_format() -> None:
- """Negative binding votes are formatted correctly."""
- body = vote_writer.format_vote_email_body(
- vote="-1",
asf_uid="pmcmember",
fullname="PMC Member",
is_binding=True,
+ comment="Verified signatures and checksums. Tests pass.",
)
- assert body == "-1 (binding) (pmcmember) PMC Member"
-
-
-def test_zero_binding_vote_body_format() -> None:
- """Zero binding votes are formatted correctly."""
- body = vote_writer.format_vote_email_body(
- vote="0",
- asf_uid="voter",
- fullname="Abstaining Voter",
- is_binding=True,
+ expected = (
+ "+1 (binding) (pmcmember) PMC Member\n\n"
+ "Verified signatures and checksums. Tests pass.\n\n"
+ "-- \nPMC Member (pmcmember)"
)
- assert body == "0 (binding) (voter) Abstaining Voter"
+ assert body == expected
-def test_zero_non_binding_vote_body_format() -> None:
- """Zero non-binding votes are formatted correctly."""
- body = vote_writer.format_vote_email_body(
- vote="0",
- asf_uid="voter",
- fullname="Abstaining Voter",
- is_binding=False,
+def test_committer_non_pmc_has_non_binding_vote() -> None:
+ """Committers who are not PMC members have non-binding votes."""
+ committee = MockCommittee(
+ committee_members=["alice", "bob", "charlie"],
+ committers=["alice", "bob", "charlie", "dave", "eve"],
)
- assert body == "0 (voter) Abstaining Voter"
+ is_binding = user.is_committee_member(committee, "dave") # type:
ignore[arg-type]
+ assert is_binding is False
-def test_binding_vote_with_comment() -> None:
- """Binding votes with comments include the comment and signature."""
+def test_empty_comment_no_signature() -> None:
+ """Empty comments do not add a signature."""
body = vote_writer.format_vote_email_body(
vote="+1",
asf_uid="pmcmember",
fullname="PMC Member",
is_binding=True,
- comment="Verified signatures and checksums. Tests pass.",
- )
- expected = (
- "+1 (binding) (pmcmember) PMC Member\n\n"
- "Verified signatures and checksums. Tests pass.\n\n"
- "-- \nPMC Member (pmcmember)"
+ comment="",
)
- assert body == expected
+ assert body == "+1 (binding) (pmcmember) PMC Member"
+ assert "-- \n" not in body
-def test_non_binding_vote_with_comment() -> None:
- """Non-binding votes with comments include the comment and signature."""
+def test_negative_binding_vote_body_format() -> None:
+ """Negative binding votes are formatted correctly."""
body = vote_writer.format_vote_email_body(
- vote="+1",
- asf_uid="contributor",
- fullname="A Contributor",
- is_binding=False,
- comment="Looks good to me!",
+ vote="-1",
+ asf_uid="pmcmember",
+ fullname="PMC Member",
+ is_binding=True,
)
- expected = "+1 (contributor) A Contributor\n\nLooks good to me!\n\n-- \nA
Contributor (contributor)"
- assert body == expected
+ assert body == "-1 (binding) (pmcmember) PMC Member"
def test_negative_vote_with_comment() -> None:
@@ -158,37 +156,28 @@ def test_negative_vote_with_comment() -> None:
assert body == expected
-def test_empty_comment_no_signature() -> None:
- """Empty comments do not add a signature."""
+def test_non_binding_vote_body_format() -> None:
+ """Non-binding votes do not include '(binding)' in the email body."""
body = vote_writer.format_vote_email_body(
vote="+1",
- asf_uid="pmcmember",
- fullname="PMC Member",
- is_binding=True,
- comment="",
- )
- assert body == "+1 (binding) (pmcmember) PMC Member"
- assert "-- \n" not in body
-
-
-def test_pmc_member_has_binding_vote() -> None:
- """PMC members have binding votes."""
- committee = MockCommittee(
- committee_members=["alice", "bob", "charlie"],
- committers=["alice", "bob", "charlie", "dave", "eve"],
+ asf_uid="contributor",
+ fullname="A Contributor",
+ is_binding=False,
)
- is_binding = user.is_committee_member(committee, "alice") # type:
ignore[arg-type]
- assert is_binding is True
+ assert body == "+1 (contributor) A Contributor"
-def test_committer_non_pmc_has_non_binding_vote() -> None:
- """Committers who are not PMC members have non-binding votes."""
- committee = MockCommittee(
- committee_members=["alice", "bob", "charlie"],
- committers=["alice", "bob", "charlie", "dave", "eve"],
+def test_non_binding_vote_with_comment() -> None:
+ """Non-binding votes with comments include the comment and signature."""
+ body = vote_writer.format_vote_email_body(
+ vote="+1",
+ asf_uid="contributor",
+ fullname="A Contributor",
+ is_binding=False,
+ comment="Looks good to me!",
)
- is_binding = user.is_committee_member(committee, "dave") # type:
ignore[arg-type]
- assert is_binding is False
+ expected = "+1 (contributor) A Contributor\n\nLooks good to me!\n\n-- \nA
Contributor (contributor)"
+ assert body == expected
def test_non_committer_has_non_binding_vote() -> None:
@@ -207,22 +196,33 @@ def test_none_committee_returns_false() -> None:
assert is_binding is False
-def test_admin_not_on_pmc_has_non_binding_vote(mock_app: MockApp, monkeypatch:
MonkeyPatch) -> None:
- """Admins who are not PMC members do not get binding votes."""
- user._get_additional_admin_users.cache_clear()
- monkeypatch.setattr("atr.config.get", lambda: MockConfig())
- mock_app.extensions["admins"] = frozenset({"admin_user"})
+def test_pmc_member_has_binding_vote() -> None:
+ """PMC members have binding votes."""
+ committee = MockCommittee(
+ committee_members=["alice", "bob", "charlie"],
+ committers=["alice", "bob", "charlie", "dave", "eve"],
+ )
+ is_binding = user.is_committee_member(committee, "alice") # type:
ignore[arg-type]
+ assert is_binding is True
- assert user.is_admin("admin_user") is True
- committee = MockCommittee(
- committee_members=["alice", "bob"],
- committers=["alice", "bob", "charlie"],
+def test_zero_binding_vote_body_format() -> None:
+ """Zero binding votes are formatted correctly."""
+ body = vote_writer.format_vote_email_body(
+ vote="0",
+ asf_uid="voter",
+ fullname="Abstaining Voter",
+ is_binding=True,
)
- # admin_user is not on this PMC
- is_pmc_member = user.is_committee_member(committee, "admin_user") # type:
ignore[arg-type]
- assert is_pmc_member is False
+ assert body == "0 (binding) (voter) Abstaining Voter"
- # Binding is determined ONLY by PMC membership, not admin status
- is_binding = is_pmc_member
- assert is_binding is False
+
+def test_zero_non_binding_vote_body_format() -> None:
+ """Zero non-binding votes are formatted correctly."""
+ body = vote_writer.format_vote_email_body(
+ vote="0",
+ asf_uid="voter",
+ fullname="Abstaining Voter",
+ is_binding=False,
+ )
+ assert body == "0 (voter) Abstaining Voter"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]